# Load Athletes

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('CSV Processing').getOrCreate()

Athletes_loc =  "Files/athletes.csv"

Raw_Athletes = spark.read.csv(Athletes_loc, header=True, inferSchema=True, quote='"', escape='"')

## Created Athletes Dimension
### 1. Added Age Column 
### 2. Dropped some Columns
### 3. Changed Data Type of 'code','height' and 'weight' Column

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType

Athletes_dim  = Raw_Athletes['code','name','gender','function','country_code','country','nationality_code','nationality','height','weight','birth_date']

Athletes_dim = Athletes_dim.withColumn('Agein2024',F.year(F.lit('2024-07-26')) - F.year('birth_date'))

Athletes_dim = Athletes_dim.drop('birth_date','country_code','nationality_code')

Athletes_dim = Athletes_dim.withColumn("code",F.col("code").cast(StringType())) \
                .withColumn("height",F.col("height").cast(IntegerType())) \
                .withColumn("weight",F.col("weight").cast(IntegerType()))

display(Athletes_dim)

Athletes_dim.write.mode("overwrite").format("delta").saveAsTable("athletes_dim")

## Created Athletes Disciplines Dimension
### 1. Removing 
####    - Brackets []
####    - Quotes ' 
####    - Double Quotes "
### 2. Splitting the Cleaned Column by Commas
### 3. Trimming Whitespace from Each Element
### 4. Unpivoting the Array into Individual Rows
### 5. Changed Data Type of 'code' and 'discipline' Column

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

Athletes_Dis_dim = Raw_Athletes['code','disciplines']

df_cleaned = Athletes_Dis_dim.withColumn("cleaned_disciplines", F.regexp_replace("disciplines", r"[\[\]\'\"]", ""))

df_split = df_cleaned.withColumn("disciplines_array", F.split(df_cleaned["cleaned_disciplines"], ","))

df_trimmed = df_split.withColumn("disciplines_array", F.expr("transform(disciplines_array, x -> trim(x))"))

Athletes_Dis_dim = df_trimmed.withColumn("discipline", F.explode(df_trimmed["disciplines_array"])).select("code", "discipline")

Athletes_Dis_dim = Athletes_Dis_dim.withColumn("code",F.col("code").cast(StringType())) \
                .withColumn("discipline",F.col("discipline").cast(StringType()))

display(Athletes_Dis_dim)

Athletes_Dis_dim.write.mode("overwrite").format("delta").saveAsTable("athletes_dis_dim")

## Created Athletes Language Dimension
### 1. Splitting the Language Column by Commas
### 2. Trimming Whitespace from Each Element
### 3. Unpivoting the Array into Individual Rows
### 4. Changed Data Type of 'code' and 'Language' Column

In [None]:
from pyspark.sql import functions as F

Athletes_Lang_dim = Raw_Athletes['code','lang']

Athletes_Lang_dim_split = Athletes_Lang_dim.withColumn("Language_Array",F.split(Athletes_Lang_dim["lang"],","))

Athletes_Lang_dim_trim = Athletes_Lang_dim_split.withColumn("Langauage_Array",F.expr("transform(Language_Array, x-> trim(x))"))

Athletes_Language_dim = Athletes_Lang_dim_trim.withColumn("Language",F.explode(Athletes_Lang_dim_trim["Language_Array"])).select('code','Language')

Athletes_Language_dim = Athletes_Language_dim.withColumn("code",F.col("code").cast(StringType())) \
                .withColumn("Language",F.col("Language").cast(StringType()))

display(Athletes_Language_dim)

Athletes_Language_dim.write.mode("overwrite").format("delta").saveAsTable("athletes_lang_dim")

## Created Athletes Events Dimension
### 1. Removing
#### - Brackets []
#### - Double Quotes "
### 2. Splitting the Events Column by Commas
### 3. Trimming Whitespace from Each Element
### 4. Unpivoting the Array into Individual Rows
### 5. Removing only the leading and trailing 
#### - Single Quotes '
### 6. Changed Data Type of 'code' and 'events' Column

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

Athletes_Events_dim = Raw_Athletes['code','events']

Athletes_Events_dim_Cleaned = Athletes_Events_dim.withColumn('events_cleaned',F.regexp_replace("events",r"[\]\[\"]",""))

Athletes_Events_dim_Split = Athletes_Events_dim_Cleaned.withColumn('events_split',F.split(Athletes_Events_dim_Cleaned['events_cleaned'],","))

Athletes_Events_dim_Trim = Athletes_Events_dim_Split.withColumn("events_trim",F.expr("transform(events_split, x -> trim(x))"))

Athletes_Events_dim = Athletes_Events_dim_Trim.withColumn("Events",F.explode(Athletes_Events_dim_Trim["events_trim"])).select('code','Events')

Athletes_Events_dim = Athletes_Events_dim.withColumn("Events", F.regexp_replace("Events", r"(^[\']|[\']$)", "")).select('code','Events')

Athletes_Events_dim = Athletes_Events_dim.withColumn("code",F.col("code").cast(StringType())) \
                        .withColumn("Events",F.col("Events").cast(StringType()))

display(Athletes_Events_dim)

Athletes_Events_dim.write.mode("overwrite").format("delta").saveAsTable("athletes_events_dim")

# Done with Athletes :)
# Now Load Coaches

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('CSV Processing').getOrCreate()

Coaches_Loc =  "Files/coaches.csv"

Raw_Coaches = spark.read.csv(Coaches_Loc, header=True, inferSchema=True, quote='"', escape='"')

display(Raw_Coaches)

## Created Coaches Dimension
### 1. Added Age Column
### 2. Removed Unnecessary Columns
### 3. Changed Data type of 'code' column


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

Coaches_dim = Raw_Coaches.withColumn("Agein2024", F.year(F.lit('2024-07-24')) - F.year('birth_date'))

Coaches_dim = Coaches_dim['code','name','gender','function','country_code','country','disciplines','events','birth_date','Agein2024']

Coaches_dim = Coaches_dim.withColumn('code',F.col('code').cast(StringType()))

display(Coaches_dim)

Coaches_dim.write.mode("overwrite").format("delta").saveAsTable("coaches_dim")

# Done with Coaches :)
# Now Load Events

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('CSV Processing').getOrCreate()

Events_Loc =  "Files/events.csv"

Raw_Events = spark.read.csv(Events_Loc, header=True, inferSchema=True, quote='"', escape='"')

display(Raw_Events)

## Created Events Dimension
### 1. Added 2 Columns:
#### - 'event_code' by merging 'event' column and 'sport_code' column delimited by a space (" ")
#### - 'event_sport' by merging 'event' column and 'sport' column delimited by a space (" ")


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

Events_dim = Raw_Events.withColumn("event_code",F.concat(F.col('event'),F.lit(" "),F.col('sport_code')))

Events_dim = Events_dim.withColumn("event_sport",F.concat(F.col('event'),F.lit(" "),F.col('sport')))

Events_dim = Events_dim['event_code','event_sport','event','tag','sport','sport_code','sport_url']

display(Events_dim)

Events_dim.write.mode("overwrite").format("delta").saveAsTable("events_dim")

# Done with Events :)
# Now Load Teams

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('CSV Processing').getOrCreate()

Teams_Loc =  "Files/teams.csv"

Raw_Teams = spark.read.csv(Teams_Loc, header=True, inferSchema=True, quote='"', escape='"')

display(Raw_Teams)

## Created Teams Dimension
### 1. Added 'event_code' column to Teams Dimension by combining 'events' column and 'disciplines_code' column delimited by a space (" ")
### 2. Removed unnecessary columns. 
### 3. Changed data type of 'num_athletes' and 'num_coaches' column


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

Teams_dim = Raw_Teams.withColumn('event_code',F.concat(F.col('events'),F.lit(" "),F.col('disciplines_code')))

Teams_dim = Teams_dim['code','team','team_gender','country_code','country','event_code','events','disciplines_code','discipline','num_athletes','num_coaches']

Teams_dim = Teams_dim.withColumn('num_athletes',F.col('num_athletes').cast(IntegerType())) \
            .withColumn('num_coaches',F.col('num_coaches').cast(IntegerType()))

display(Teams_dim)

Teams_dim.write.mode("overwrite").format("delta").saveAsTable("teams_dim")

## Created Teams Athletes Names Dimension
### 1. Removing
#### - Brackets []
#### - Double Quotes "
#### - Quotes '
### 2. Splitting the Athletes Column by Commas
### 3. Trimming Whitespace from Each Element
### 4. Unpivoting the Array into Individual Rows
### 5. Added a new Column 'Id' with Monotonically Increasing id starting from 1

In [None]:
from pyspark.sql import functions as F

Teams_Athletes = Raw_Teams['code','athletes']

Teams_Athletes_Cleaned = Teams_Athletes.withColumn('athletes',F.regexp_replace("athletes",r"[\]\[\"\']",""))

Teams_Athletes_Split = Teams_Athletes_Cleaned.withColumn('athletes',F.split(Teams_Athletes_Cleaned['athletes'],","))

Teams_Athletes_Trimmed = Teams_Athletes_Split.withColumn('athletes',F.expr("transform(athletes, x -> trim(x))"))

Teams_Athletes_dim = Teams_Athletes_Trimmed.withColumn('Athletes',F.explode(Teams_Athletes_Trimmed['athletes'])).select('code','Athletes')

Teams_AthleteName_dim = Teams_Athletes_dim.withColumn("Id", F.monotonically_increasing_id() + 1)

display(Teams_AthleteName_dim)

## Created Teams Athletes id Dimension
### 1. Removing
#### - Brackets []
#### - Double Quotes "
#### - Quotes '
### 2. Splitting the Athletes Codes Column by Commas
### 3. Trimming Whitespace from Each Element
### 4. Unpivoting the Array into Individual Rows
### 5. Added a new Column 'Id' with Monotonically Increasing id starting from 1

In [None]:
from pyspark.sql import functions as F

Teams_Athletes_id = Raw_Teams['code','athletes_codes']

Teams_Athletes_id_Cleaned = Teams_Athletes_id.withColumn('athletes_codes',F.regexp_replace("athletes_codes",r"[\]\[\"\']",""))

Teams_Athletes_id_Split = Teams_Athletes_id_Cleaned.withColumn('athletes_codes',F.split(Teams_Athletes_id_Cleaned['athletes_codes'],","))

Teams_Athletes_id_Trimmed = Teams_Athletes_id_Split.withColumn('athletes_codes',F.expr("transform(athletes_codes, x -> trim(x))"))

Teams_Athletes_dim = Teams_Athletes_id_Trimmed.withColumn('Athlete_Code',F.explode(Teams_Athletes_id_Trimmed['athletes_codes'])).select('code','Athlete_Code')

Teams_Athlete_Id_dim = Teams_Athletes_dim.withColumn("Id", F.monotonically_increasing_id() + 1)

display(Teams_Athlete_Id_dim)

## Created Team Athletes Dimension
## Before Joining change the Column name of 'code' to 'Code1' from 'Teams_Athlete_Id' Dataframe to remove ambiguity.
### 1. Join These two Dataframes 'Teams_Athlete_Id' and 'Teams_AthleteName_dim' using Inner Join on id column.
### 2. Remove Unnecessary Columns 

In [None]:
Teams_Athlete_Id_dim = Teams_Athlete_Id_dim.withColumnRenamed('code','Code1')

Team_Athletes_dim =  Teams_Athlete_Id_dim.join(Teams_AthleteName_dim, on="id", how="inner")

Team_Athletes_dim = Team_Athletes_dim['code','Athlete_Code','Athletes']

display(Team_Athletes_dim)

Team_Athletes_dim.write.mode("overwrite").format("delta").saveAsTable("team_athletes_dim")

## Created Teams Coaches Names Dimension
### 1. Removing
#### - Brackets []
#### - Double Quotes "
#### - Quotes '
### 2. Splitting the Coaches Column by Commas
### 3. Trimming Whitespace from Each Element
### 4. Unpivoting the Array into Individual Rows
### 5. Added a new Column 'Id' with Monotonically Increasing id starting from 1

In [None]:
from pyspark.sql import functions as F

Teams_Coaches = Raw_Teams['code','coaches']

Teams_Coaches_Cleaned = Teams_Coaches.withColumn('coaches',F.regexp_replace("coaches",r"[\]\[\"\']",""))

Teams_Coaches_Split = Teams_Coaches_Cleaned.withColumn('coaches',F.split(Teams_Coaches_Cleaned['coaches'],","))

Teams_Coaches_Trimmed = Teams_Coaches_Split.withColumn('coaches',F.expr("transform(coaches, x -> trim(x))"))

Teams_Coaches_dim = Teams_Coaches_Trimmed.withColumn('coach',F.explode(Teams_Coaches_Trimmed['coaches'])).select('code','coach')

Teams_Coaches_Names_dim = Teams_Coaches_dim.withColumn("Id", F.monotonically_increasing_id() + 1)

display(Teams_Coaches_Names_dim)

## Created Teams Coaches id Dimension
### 1. Removing
#### - Brackets []
#### - Double Quotes "
#### - Quotes '
### 2. Splitting the Coaches Codes Column by Commas
### 3. Trimming Whitespace from Each Element
### 4. Unpivoting the Array into Individual Rows
### 5. Added a new Column 'Id' with Monotonically Increasing id starting from 1

In [None]:
from pyspark.sql import functions as F

Teams_Coaches_id = Raw_Teams['code','coaches_codes']

Teams_Coaches_id_Cleaned = Teams_Coaches_id.withColumn('coaches_codes',F.regexp_replace("coaches_codes",r"[\]\[\"\']",""))

Teams_Coaches_id_Split = Teams_Coaches_id_Cleaned.withColumn('coaches_codes',F.split(Teams_Coaches_id_Cleaned['coaches_codes'],","))

Teams_Coaches_id_Trimmed = Teams_Coaches_id_Split.withColumn('coaches_codes',F.expr("transform(coaches_codes, x -> trim(x))"))

Teams_Coaches_dim = Teams_Coaches_id_Trimmed.withColumn('coach_code',F.explode(Teams_Coaches_id_Trimmed['coaches_codes'])).select('code','coach_code')

Teams_Coaches_Id_dim = Teams_Coaches_dim.withColumn("Id", F.monotonically_increasing_id() + 1)

display(Teams_Coaches_Id_dim)

## Created Team Coaches Dimension
## Before Joining change the Column name of 'code' to 'Code1' from 'Teams_Coaches_Id_dim' Dataframe to remove ambiguity.
### 1. Join These two Dataframes 'Teams_Coaches_Id_dim' and 'Teams_Coaches_dim' using Inner Join on id column.
### 2. Remove Unnecessary Columns 

In [None]:
Teams_Coaches_Id_dim = Teams_Coaches_Id_dim.withColumnRenamed('code','Code1')

Team_Coaches_Joined_dim =  Teams_Coaches_Id_dim.join(Teams_Coaches_Names_dim, on="Id", how="inner")

Team_Coaches_dim = Team_Coaches_Joined_dim['code','coach_code','coach']

display(Team_Coaches_dim)

Team_Coaches_dim.write.mode("overwrite").format("delta").saveAsTable("team_coaches_dim")

# Done With Teams :)
# Now Load Technical Officials

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('CSV Processing').getOrCreate()

Technical_Officials_Loc =  "Files/technical_officials.csv"

Raw_Technical_Officials = spark.read.csv(Technical_Officials_Loc, header=True, inferSchema=True, quote='"', escape='"')

display(Raw_Technical_Officials)

## Created Technical Officials Dimension
### 1. Removed unnecessary columns. 
### 2. Changed data type of 'code' column


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

Technical_Officials_dim = Raw_Technical_Officials['code','name','gender','function','organisation_code','organisation']

Technical_Officials_dim = Technical_Officials_dim.withColumn('code',F.col('code').cast(StringType()))

display(Technical_Officials_dim)

Technical_Officials_dim.write.mode("overwrite").format("delta").saveAsTable("technical_officials_dim")

## Created Technical Officials Disciplines Dimension
### 1. Removing
#### - Brackets []
#### - Quotes '
### 2. Splitting the disciplines Column by Commas
### 3. Trimming Whitespace from Each Element
### 4. Unpivoting the Array into Individual Rows
### 5. Changed data type of 'code' column

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

Technical_Officials_Disciplines_dim = Raw_Technical_Officials['code','disciplines']

Technical_Officials_Disciplines_dim_Cleaned = Technical_Officials_Disciplines_dim.withColumn('disciplines',F.regexp_replace("disciplines",r"[\]\[\']",""))

Technical_Officials_Disciplines_dim_Split = Technical_Officials_Disciplines_dim_Cleaned.withColumn('disciplines',F.split('disciplines',","))

Technical_Officials_Disciplines_dim_Trimmed = Technical_Officials_Disciplines_dim_Split.withColumn('disciplines',F.expr("transform(disciplines, x -> trim(x))"))

Technical_Officials_Disciplines_dim = Technical_Officials_Disciplines_dim_Trimmed.withColumn('discipline',F.explode(Technical_Officials_Disciplines_dim_Trimmed['disciplines'])).select('code','discipline')

Technical_Officials_Disciplines_dim = Technical_Officials_Disciplines_dim.withColumn('code',F.col('code').cast(StringType()))

display(Technical_Officials_Disciplines_dim)

Technical_Officials_Disciplines_dim.write.mode("overwrite").format("delta").saveAsTable("technical_officials_dis_dim")

# Done With Technical Officials :)
# Now Load Medals Total

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('CSV Processing').getOrCreate()

Medals_Total_Loc =  "Files/medals_total.csv"

Raw_Medals_Total = spark.read.csv(Medals_Total_Loc, header=True, inferSchema=True, quote='"', escape='"')


# Create Medals Total Fact
### 1. Removed unnecessary columns.

In [None]:
Medals_Total_Fact = Raw_Medals_Total['country_code','country','Gold_Medal','Silver_Medal','Bronze_Medal','Total']

display(Medals_Total_Fact)

Medals_Total_Fact.write.mode("overwrite").format("delta").saveAsTable("medals_total_fact")

# Done With Medals Total :)
# Now Create Medal Type Dimension from Medals

### 1. Selected only 'medal_type' and 'medal_code' Column
### 2. Dropped all the duplicates along with null rows.
### 3. Changed data type of 'medal_code' Column

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

spark = SparkSession.builder.appName('CSV Processing').getOrCreate()

Medal_Types_Loc =  "Files/medals.csv"

Raw_Medals = spark.read.csv(Medal_Types_Loc, header=True, inferSchema=True, quote='"', escape='"')

Raw_Medals = Raw_Medals['medal_type','medal_code'].drop_duplicates()

Raw_Medals = Raw_Medals.na.drop()

Medal_Types_dim = Raw_Medals.withColumn('medal_code',F.col('medal_code').cast(IntegerType())).orderBy(F.col('medal_code'))

display(Medal_Types_dim)

Medal_Types_dim.write.mode("overwrite").format("delta").saveAsTable("medals_types_dim")

# Done with Medal Type :)
# Now load Medallists

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('CSV Processing').getOrCreate()

Medallists_Loc =  "Files/medallists.csv"

Raw_Medallists = spark.read.csv(Medallists_Loc, header=True, inferSchema=True, quote='"', escape='"')

display(Raw_Medallists)

## Created Medallists Fact
### 1. Removed unnecessary columns.
### 2. Changed data type of 'code_athlete' column
### 3. Added Age Column


In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

Medallist_Fact = Raw_Medallists['medal_type','medal_date','name','gender','country_code','country','nationality_code','nationality','team','team_gender','discipline','event','event_type','url_event','birth_date','code_athlete','code_team','is_medallist']

Medallist_Fact = Medallist_Fact.withColumn('code_athlete',F.col('code_athlete').cast(StringType()))

Medallist_Fact = Medallist_Fact.withColumn('Agein2024', F.year(F.lit('2024-07-26')) - F.year('birth_date'))

display(Medallist_Fact)

Medallist_Fact.write.mode("overwrite").format("delta").saveAsTable("medallist_fact")

# Done with Medallists :)
# Now load Medals


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('CSV Processing').getOrCreate()

Medals_Loc =  "Files/medals.csv"

Raw_Medals = spark.read.csv(Medals_Loc, header=True, inferSchema=True, quote='"', escape='"')

display(Raw_Medals)

# Create Medal Athletes Fact
## 1. Remove Unnecessary Columns
## 2. Inner Join 'Medal_Athletes_Fact' with 'Athletes_dim' Dataframe on 'code' Column

In [None]:
Medal_Athletes_Fact = Raw_Medals['medal_type','medal_date','name','gender','discipline','event','event_type','url_event','code','country_code']

Medal_Athletes_Fact = Medal_Athletes_Fact.join(Athletes_dim,on='code',how='inner')

display(Medal_Athletes_Fact)

Medal_Athletes_Fact.write.mode("overwrite").format("delta").saveAsTable("medal_athletes_fact")

# Create Medal Teams Fact
## 1. Remove Unnecessary Columns
## 2. Inner Join 'Medal_Teams_Fact' with 'Teams_dim' Dataframe on 'code' Column

In [None]:
Medal_Teams_Fact = Raw_Medals['medal_type','medal_date','name','gender','event_type','url_event','code','country_code']

Medal_Teams_Fact = Medal_Teams_Fact.join(Teams_dim,on ='code',how='inner')

display(Medal_Teams_Fact)

Medal_Teams_Fact.write.mode("overwrite").format("delta").saveAsTable("medal_teams_fact")

# Done with Medals :)
# Now load Nocs

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('CSV Processing').getOrCreate()

Nocs_Loc =  "Files/nocs.csv"

Raw_Nocs = spark.read.csv(Nocs_Loc, header=True, inferSchema=True, quote='"', escape='"')

display(Raw_Nocs)

# Create Nocs Dimension

### 1. Remove Unnecessary Columns

In [None]:
Nocs_dim = Raw_Nocs['code','country','tag','note']

display(Nocs_dim)

Nocs_dim.write.mode("overwrite").format("delta").saveAsTable("nocs_dim")

# Done with Nocs :)
# Now load Schedules


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('CSV Processing').getOrCreate()

Schedules_Loc =  "Files/schedules.csv"

Raw_Schedules = spark.read.csv(Schedules_Loc, header=True, inferSchema=True, quote='"', escape='"')

display(Raw_Schedules)

# Created Schedules Fact
## 1. Created 'time_diff','hours','minutes','seconds','formatted_time_diff' Columns
## 2. Created 'event_code' Column
## 3. Removed Unnecessary Columns

In [None]:
from pyspark.sql import functions as F

Schedules_Fact = Raw_Schedules.withColumn("time_diff",F.expr("CAST((unix_timestamp(end_date) - unix_timestamp(start_date)) AS INT)")) \
                                .withColumn("hours", F.floor(F.col("time_diff") / 3600)) \
                                .withColumn("minutes", F.floor((F.col("time_diff") % 3600) / 60)) \
                                .withColumn("seconds", F.col("time_diff") % 60) \
                                .withColumn("formatted_time_diff",F.concat_ws(":", F.col("hours"), F.col("minutes"), F.col("seconds")))
                                
Schedules_Fact = Schedules_Fact.withColumn('event_code',F.concat(F.col('event'),F.lit(" "),F.col('discipline_code')))

Schedules_Fact = Schedules_Fact['start_date','end_date','day','hours','minutes','formatted_time_diff','status','discipline_code','discipline','event_code','event','event_medal','event_type','venue','venue_code','location_code','location_description','phase','gender','url']

display(Schedules_Fact)

Schedules_Fact.write.mode("overwrite").format("delta").saveAsTable("schedules_fact")

# Done With Schedules Fact :)
# Now load Venues

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('CSV Processing').getOrCreate()

Venues_Loc =  "Files/venues.csv"

Raw_Venues = spark.read.csv(Venues_Loc, header=True, inferSchema=True, quote='"', escape='"')

display(Raw_Venues)

# Create Venues Dimension

## 1. Remove Unnecessary Columns

In [None]:
Venues_Dimension = Raw_Venues['venue','date_start','date_end','tag','url']

display(Venues_Dimension)


# Create Venues Sports Dimension
### 1. Removing
#### - Brackets []
#### - Quotes '
### 2. Splitting the sports Column by Commas
### 3. Trimming Whitespace from Each Element
### 4. Unpivoting the Array into Individual Rows

In [None]:
from pyspark.sql import functions as F

Venues_Sport_Dimension = Raw_Venues['venue','sports']

Venues_Sport_Dimension_Cleaned = Venues_Sport_Dimension.withColumn('sports',F.regexp_replace("sports",r"[\]\[\']",""))

Venues_Sport_Dimension_Split = Venues_Sport_Dimension_Cleaned.withColumn('sports',F.split('sports',","))

Venues_Sport_Dimension_Trimmed = Venues_Sport_Dimension_Split.withColumn('sports',F.expr("transform(sports, x -> trim(x))"))

Venues_Sport_Dimension = Venues_Sport_Dimension_Trimmed.withColumn('sport',F.explode(Venues_Sport_Dimension_Trimmed['sports'])).select('venue','sport')

display(Venues_Sport_Dimension)

# Inner Join 'Venues_Sport_Dimension' with 'Venues_Dimension' Dataframe on 'venue' Column


In [None]:
Venues_Dim = Venues_Dimension.join(Venues_Sport_Dimension,on='venue',how = 'inner')

display(Venues_Dim)

Venues_Dim.write.mode("overwrite").format("delta").saveAsTable("venues_dim")

# Done with Venues :)
# Now load Torch Route

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('CSV Processing').getOrCreate()

Torch_Route_Loc =  "Files/torch_route.csv"

Raw_Torch_Route = spark.read.csv(Torch_Route_Loc, header=True, inferSchema=True, quote='"', escape='"')

display(Raw_Torch_Route)

# Create Torch Route Dimension

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

Torch_Route_dim = Raw_Torch_Route.withColumn('stage_number',F.col('stage_number').cast(IntegerType()))

display(Torch_Route_dim)

Torch_Route_dim.write.mode("overwrite").format("delta").saveAsTable("torch_route_dim")

# Done with Torch Route :)
# Now load Flags
## 1. Clean column names by replacing spaces and special characters with underscores

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("CSV Processing").getOrCreate()

file_path = "Files/flags_iso.csv" 

Raw_Flags = spark.read.csv(file_path, header=True, inferSchema=True, quote='"', escape='"')

Flag_dim = Raw_Flags.select([col(column).alias(column.replace(' ', '_').replace('-', '_')) for column in df.columns])

Flag_dim.write.mode("overwrite").format("delta").saveAsTable("flag_dim")
