In [0]:
%run ./0.Config

### 1. Load raw data

In [0]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
from pyspark.sql.functions import monotonically_increasing_id

In [0]:
#Create schema
Athletes_schema = StructType(fields= [StructField('personName',StringType(),False),
                                     StructField('country',StringType(),False),
                                     StructField('discipline',StringType(),False)
])

Teams_schema = StructType(fields= [StructField('teamName',StringType(),False),
                                     StructField('discipline',StringType(),False),
                                     StructField('country',StringType(),False),
                                     StructField('event',StringType(),False)
])

EntriesGender_schema = StructType(fields= [StructField('discipline',StringType(),False),
                                     StructField('female',IntegerType(),False),
                                     StructField('male',IntegerType(),False),
                                     StructField('total',IntegerType(),False)
])

Coaches_schema = StructType(fields= [StructField('name',StringType(),False),
                                     StructField('country',StringType(),False),
                                     StructField('discipline',StringType(),False),
                                     StructField('event',StringType(),True)
])

Medals_schema = StructType(fields= [StructField('rank',IntegerType(),False),
                                     StructField('teamCountry',StringType(),False),
                                     StructField('gold',IntegerType(),False),
                                     StructField('silver',IntegerType(),False),
                                     StructField('bronze',IntegerType(),False),
                                     StructField('total',IntegerType(),False),
                                     StructField('rankByTotal',IntegerType(),False)
])

In [0]:
Athletes_raw_df = spark.read.option('header',True).schema(Athletes_schema).csv(Athletes_raw_path)
Coaches_raw_df = spark.read.option('header',True).schema(Coaches_schema).csv(Coaches_raw_path)
EntriesGender_df = spark.read.option('header',True).schema(EntriesGender_schema).csv(EntriesGender_raw_path)
Medals_raw_df = spark.read.option('header',True).schema(Medals_schema).csv(Medals_raw_path)
Teams_raw_df = spark.read.option('header',True).schema(Teams_schema).csv(Teams_raw_path)

### 2.1. `Discipline` - Process and write to delta lake

In [0]:
discipline_df = EntriesGender_df.select('discipline')\
                                .dropDuplicates()\
                                .withColumn('id',monotonically_increasing_id())

processed_discipline_df= discipline_df.select('id','discipline')


In [0]:
processed_discipline_df.write.format('delta').mode('overwrite').saveAsTable("olympic_tokyo_processed.discipline")

### 2.2. `Country` - Process and write to delta lake

In [0]:
country_df = Medals_raw_df.select('teamCountry')\
                            .dropDuplicates()\
                            .withColumn('id',monotonically_increasing_id())

processed_country_df= country_df.select('id','teamCountry')

In [0]:
processed_country_df.write.format('delta').mode('overwrite').saveAsTable("olympic_tokyo_processed.country")

### 2.3. `Athletes` - Process and write to delta lake

In [0]:
Ath_Dis_Cntry_df = Athletes_raw_df.join(processed_discipline_df,\
                                        Athletes_raw_df['discipline'] == processed_discipline_df['discipline'],'left').join(processed_country_df, Athletes_raw_df['country'] == processed_country_df['teamCountry'],'left')
processed_athletes_df = Ath_Dis_Cntry_df.select(Athletes_raw_df.personName,processed_country_df.id.alias('countryId'),processed_discipline_df.id.alias('disciplineId'))

In [0]:
processed_athletes_df.write.format('delta').mode('overwrite').saveAsTable("olympic_tokyo_processed.athletes")

### 2.4. `EntriesGender` - Process and write to delta lake

In [0]:
EntriesGender_df.write.format('delta').mode('overwrite').saveAsTable("olympic_tokyo_processed.entriesgender")

### 2.5. `Medals` - Process and write to delta lake


In [0]:
Medals_raw_df.write.format('delta').mode('overwrite').saveAsTable("olympic_tokyo_processed.medals")