In [12]:
import os
import pandas as pd
from datetime import datetime, timedelta

In [13]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType


In [14]:
import sys
sys.path.append(r'/home/gabriel/Documents/proj/utils/')

from utils import createSparkSesion
from pyspark.sql import (functions as F,
                                        Window)
import matplotlib.pyplot as plt
import pandas as pd

spark esta no python 3.8.10

In [15]:
spark = createSparkSesion()

<pyspark.sql.session.SparkSession object at 0x7f5177fc1730>


In [16]:
spark

In [17]:
prefix_path = "/home/gabriel/Documents/local_datasets/raw/archive"
file        = "athletes.csv"

In [18]:
schema = StructType([
    StructField("id",               IntegerType(),  nullable=True),
    StructField("name",             StringType(),   nullable=True),
    StructField("gender",           StringType(),   nullable=True),
    StructField("age",              IntegerType(),  nullable=True),
    StructField("height",           IntegerType(),  nullable=True),
    StructField("weight",           IntegerType(),  nullable=True),
    StructField("team",             StringType(),   nullable=True),
    StructField("noc",              StringType(),   nullable=True),
    StructField("games",            StringType(),   nullable=True),
    StructField("year",             IntegerType(),  nullable=True),
    StructField("season",           StringType(),   nullable=True),
    StructField("city",             StringType(),   nullable=True),
    StructField("sport",            StringType(),   nullable=True),
    StructField("event",            StringType(),   nullable=True),
    StructField("medal",            StringType(),   nullable=True)
])

In [19]:
eda_df  = (spark.read
           .csv(f"{prefix_path}/{file}",
                  header=True, 
                  schema=schema)
           .withColumnRenamed("noc","team_prefix"))

In [20]:
eda_df.count()


                                                                                

271116

In [21]:
eda_df.takePandas()

Unnamed: 0,id,name,gender,age,height,weight,team,team_prefix,games,year,season,city,sport,event,medal
0,1,A Dijiang,M,24,180.0,80.0,China,CHN,1992 Summer,1992,Summer,Barcelona,Basketball,Basketball Men's Basketball,
1,2,A Lamusi,M,23,170.0,60.0,China,CHN,2012 Summer,2012,Summer,London,Judo,Judo Men's Extra-Lightweight,
2,3,Gunnar Nielsen Aaby,M,24,,,Denmark,DEN,1920 Summer,1920,Summer,Antwerpen,Football,Football Men's Football,
3,4,Edgar Lindenau Aabye,M,34,,,Denmark/Sweden,DEN,1900 Summer,1900,Summer,Paris,Tug-Of-War,Tug-Of-War Men's Tug-Of-War,Gold
4,5,Christine Jacoba Aaftink,F,21,185.0,82.0,Netherlands,NED,1988 Winter,1988,Winter,Calgary,Speed Skating,Speed Skating Women's 500 metres,


<h1>Nomalization Diagram V1</h1>

<img src="/home/gabriel/Documents/proj/src/imgs/diagramas_athletes_history.png" width="700">


<h1>Creating normalized tables V1</h1>

In [22]:
from pyspark.sql.functions import monotonically_increasing_id

In [23]:
gender = (
     eda_df
          .select(
               
             (F.when(F.col("gender")   == "M", "Male")
               .when(F.col("gender")   == "F", "Female")
               .otherwise("Nao cadastrado")
               ).alias("gender") 
          )
          .distinct()
          .withColumn("gender_id",monotonically_increasing_id())
          )

In [24]:
team = (
   eda_df
      .select(
         "team",
         "team_prefix"
      )
      .distinct()
      .withColumn("team_id",monotonically_increasing_id())
)

In [25]:
athlete = (
   eda_df
      .select(
         "name",
         "team", # join drop
         "gender", #join drop
         "team_prefix", #join drop
         "height",
         "weight",
         "year",
         "age"
      )
      .distinct()
)


In [26]:
##mocking a birthday to replace the int age
## dedup first
## get the age less the year , getting some random month and day with a fixed seed and putting all together in a formated string , casting to date format
athlete_deduped_window = Window.partitionBy("name","team","gender").orderBy("age")

athlete_deduped = (
   athlete
      .withColumn("dedup_rn",F.row_number().over(athlete_deduped_window))
      .filter(F.col("dedup_rn") == 1)
      .drop("dedup_rn")
      .withColumn("day", F.expr("cast(floor(rand(42) * 25 + 1) as INT)"))
      .withColumn("month", F.expr("cast(floor(rand(42) * 12 + 1) as INT)"))
      .withColumn("birth",F.to_date(F.format_string("%04d-%02d-%02d",F.col("year"),F.col("month"),F.col("day"))))
      .select("name","team","team_prefix","gender","weight","height","birth")
      .withColumn("athlete_id",monotonically_increasing_id())
)


In [27]:
normalized_athlete = (
   athlete_deduped
      .join(team, on = ["team","team_prefix"], how = "inner")
      .withColumn("gender",F.when(F.col("gender")=="M","Male").otherwise("Female"))
      .join(gender, on = ["gender"])
      .select("athlete_id","gender_id","name","weight","height","birth","team_id")
)
#final table to create the table on db


 to do ; ver quantidade de medalhas em cada intervalo de tempo com lag na parte da analise

In [28]:
city = (
   eda_df
      .select("city")
      .distinct()
      .withColumn("city_id",monotonically_increasing_id())
      .withColumn("province_id", F.lit("not avaliable"))
)

In [29]:
sport = (
   eda_df
      .select("sport")
      .distinct()
      .withColumn("sport_id",monotonically_increasing_id())
)

In [30]:
event = (
   eda_df
      .select("city",
               "year",
               "season",
               "sport",
               "event"
              )
      .distinct()
)

In [31]:
nomralized_event = (
   event.select(
      "event",
      "season",
      "year",
      "city",
      "sport"
   )
   .distinct()
   .withColumn("event_id",monotonically_increasing_id())
   .join(city, on = ["city"],how = "inner")
   .join(sport, on = ["sport"],how = "inner")
   .drop(*['city','sport','province_id'])
)

In [32]:
athletes_history = (
   eda_df
      .join(normalized_athlete,on = ['name'],how='inner')
      .join(nomralized_event,on = ['event'], how='inner')
      .drop(*[
         'name','gender','age','gender_id','birth','city_id','sport_id','weight','height','team','team_prefix','games','year','season','city','sport','event'
         ])
      .withColumnRenamed("id","register_id")
      .distinct()
)

In [None]:
tables_definitions = [
   {"table_name": "athlete" ,           "df_reference" : normalized_athlete},
   {"table_name": "gender" ,            "df_reference" : gender},
   {"table_name": "team" ,              "df_reference" : team},
   {"table_name": "city" ,              "df_reference" : city},
   {"table_name": "sport" ,             "df_reference" : normalized_athlete},
   {"table_name": "nomralized_event" ,  "df_reference" : normalized_athlete},
   {"table_name": "athletes_history" ,  "df_reference" : athletes_history}]

In [41]:
tables_definitions[0]['df_reference']

DataFrame[athlete_id: bigint, gender_id: bigint, name: string, weight: int, height: int, birth: date, team_id: bigint]

In [42]:
for definition in tables_definitions:
   (definition['df_reference'].write 
    .format("jdbc") 
    .option("url", "jdbc:postgresql://localhost:5432/general_purpose") 
    .option("dbtable", f"{definition['table_name']}") 
    .option("user", "postgres") 
    .option("password", "30061725") 
    .option("driver", "org.postgresql.Driver") 
    .mode("overwrite")   
    .save())

25/05/20 20:47:53 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                