### Subset the dataframe according to schema

In [30]:
import os
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np

In [31]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/project/spark-3.2.1-bin-hadoop3.2"

In [32]:
spark = SparkSession \
    .builder \
    .appName("Database") \
    .config("spark.jars", "postgresql-42.3.2.jar") \
    .getOrCreate()

In [33]:
driver_stats_final=pd.read_csv('/project/DE-individual/collection_output/driver_stats_final.csv')
driver_standings_final=pd.read_csv('/project/DE-individual/collection_output/driver_standings_final.csv')

In [34]:
driver_stats_final.drop('Unnamed: 0',axis=1,inplace=True)
driver_standings_final.drop('Unnamed: 0',axis=1,inplace=True)

In [35]:
driver_stats_spark_df = spark.createDataFrame(driver_stats_final)
driver_standings_spark_df = spark.createDataFrame(driver_standings_final)

In [36]:
driver_stats_spark_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- rookie_year: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)
 |-- birth_place: string (nullable = true)
 |-- twitter: string (nullable = true)
 |-- height: string (nullable = true)
 |-- weight: string (nullable = true)
 |-- series: string (nullable = true)



In [37]:
driver_standings_spark_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- status: string (nullable = true)
 |-- rank: long (nullable = true)
 |-- points: long (nullable = true)
 |-- starts: long (nullable = true)
 |-- wins: long (nullable = true)
 |-- poles: long (nullable = true)
 |-- stage_wins: string (nullable = true)
 |-- chase_bonus: string (nullable = true)
 |-- chase_wins: string (nullable = true)
 |-- chase_stage_wins: string (nullable = true)
 |-- top_5: long (nullable = true)
 |-- top_10: long (nullable = true)
 |-- top_15: long (nullable = true)
 |-- top_20: long (nullable = true)
 |-- dnf: long (nullable = true)
 |-- laps_led: long (nullable = true)
 |-- laps_completed: long (nullable = true)
 |-- money: double (nullable = true)
 |-- avg_start_position: double (nullable = true)
 |-- avg_finish_position: double (nullable = true)
 |-- avg_laps_completed: double (nullable = true)
 |-

In [38]:
# convert the data frame into parquet format
driver_stats_spark_df.write.parquet("/project/parquet_files/driver_stats.parquet", mode = 'overwrite')
driver_standings_spark_df.write.parquet("/project/parquet_files/driver_standings.parquet", mode = 'overwrite')

In [39]:
# read the parquet files
driver_stats_df = spark.read.parquet("/project/parquet_files/driver_stats.parquet").toPandas()
driver_standings_df =  spark.read.parquet("/project/parquet_files/driver_standings.parquet").toPandas()

In [40]:
driver_stats_df.rename(columns={"id":"driver_id"},inplace=True)
driver_standings_df.rename(columns={"id":"driver_id"},inplace=True)

In [41]:
driver_stats_df.replace("nan value",np.nan,inplace=True)
driver_standings_df.replace("nan value",np.nan,inplace=True)

In [42]:
#Subsets the data frame
personal_info= pd.DataFrame(driver_stats_df[['driver_id','first_name','last_name','full_name','gender','height',
                            'weight','birthday','birth_place','country']].drop_duplicates(subset='driver_id',keep='first').reset_index(drop=True))
personal_info['id']=range(1,482)

social_media= pd.DataFrame(driver_stats_df[['driver_id','twitter']].drop_duplicates().reset_index(drop=True))
m1 = ~social_media.duplicated(subset='driver_id', keep=False)
m2 = social_media.notnull().all(axis=1)
social_media = social_media[m1 | m2]
social_media=pd.merge(social_media,personal_info[['driver_id','id']],how='left',on='driver_id')
social_media.drop('driver_id',axis=1,inplace=True)
social_media.rename(columns={'id':'driver'},inplace=True)
social_media['social_media_id']=range(1,484)

race_info=pd.DataFrame(driver_standings_df[['season','series']].drop_duplicates().sort_values(by='season',ascending=True).reset_index(drop=True))
race_info['id']=range(1,25)

status_info=pd.DataFrame(driver_standings_df[['driver_id','status']].drop_duplicates().reset_index(drop=True))
status_info['status_info_id']=range(1,478)
status_info=pd.merge(status_info,personal_info[['driver_id','id']],how='left',on='driver_id')
status_info.drop('driver_id',axis=1,inplace=True)
status_info.rename(columns={'id':'driver'},inplace=True)

debut_info=pd.DataFrame(driver_stats_df[['driver_id','series','rookie_year']].drop_duplicates().reset_index(drop=True))
debut_info['debut_info_id']=range(1,780)
debut_info=pd.merge(debut_info,personal_info[['driver_id','id']],how='left',on='driver_id')
debut_info.drop('driver_id',axis=1,inplace=True)
debut_info.rename(columns={'id':'driver'},inplace=True)

earnings=pd.DataFrame(driver_standings_df[['driver_id','money','series','season']].drop_duplicates().reset_index(drop=True))
earnings=pd.merge(earnings,race_info,how='left',on=['series','season'])
earnings.drop(['series','season'],axis=1,inplace=True)
earnings.rename(columns={'id':'race_id'},inplace=True)
earnings['earnings_id']=range(1,2162)
earnings=pd.merge(earnings,personal_info[['driver_id','id']],how='left',on='driver_id')
earnings.drop('driver_id',axis=1,inplace=True)
earnings.rename(columns={'id':'driver'},inplace=True)

seasonal_achievement= pd.DataFrame(driver_standings_df[['driver_id','rank','starts','points','wins','stage_wins'
                                 ,'poles','top_5','top_10','top_15','top_20','series','season']].drop_duplicates().reset_index(drop=True))
seasonal_achievement=pd.merge(seasonal_achievement,race_info,how='left',on=['series','season'])
seasonal_achievement.drop(['series','season'],axis=1,inplace=True)
seasonal_achievement.rename(columns={'id':'race_id'},inplace=True)
seasonal_achievement['seasonal_achievement_id']=range(1,2162)
seasonal_achievement=pd.merge(seasonal_achievement,personal_info[['driver_id','id']],how='left',on='driver_id')
seasonal_achievement.drop('driver_id',axis=1,inplace=True)
seasonal_achievement.rename(columns={'id':'driver'},inplace=True)

detailed_performance= pd.DataFrame(driver_standings_df[['driver_id','chase_bonus','chase_wins','chase_stage_wins','laps_led',
                     'laps_completed' ,'avg_start_position','avg_finish_position','avg_laps_completed', 'laps_led_pct',
                     'dnf','in_chase','behind','series','season']].drop_duplicates().reset_index(drop=True))
detailed_performance=pd.merge(detailed_performance,race_info,how='left',on=['series','season'])
detailed_performance.drop(['series','season'],axis=1,inplace=True)
detailed_performance.rename(columns={'id':'race_id'},inplace=True)
detailed_performance['detailed_performance_id']=range(1,2162)
detailed_performance=pd.merge(detailed_performance,personal_info[['driver_id','id']],how='left',on='driver_id')
detailed_performance.drop('driver_id',axis=1,inplace=True)
detailed_performance.rename(columns={'id':'driver'},inplace=True)

In [43]:
def na_change_type(df):
    df.replace(np.nan,'nan value',inplace=True)

In [44]:
table_list=[personal_info,social_media,seasonal_achievement,detailed_performance,earnings,debut_info,race_info,status_info]
for i in table_list:
    na_change_type(i)

### Store the data in Parquet format 

In [45]:
personal_info_df = spark.createDataFrame(personal_info)
social_media_df = spark.createDataFrame(social_media)
seasonal_achievement_df = spark.createDataFrame(seasonal_achievement)
detailed_performance_df = spark.createDataFrame(detailed_performance)
earnings_df = spark.createDataFrame(earnings)
debut_info_df = spark.createDataFrame(debut_info)
race_info_df = spark.createDataFrame(race_info)
status_info_df = spark.createDataFrame(status_info)

In [46]:
personal_info_df.printSchema()
social_media_df.printSchema()
seasonal_achievement_df.printSchema()
detailed_performance_df.printSchema()
earnings_df.printSchema()
debut_info_df.printSchema()
race_info_df.printSchema()
status_info_df.printSchema()

root
 |-- driver_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- height: string (nullable = true)
 |-- weight: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- birth_place: string (nullable = true)
 |-- country: string (nullable = true)
 |-- id: long (nullable = true)

root
 |-- twitter: string (nullable = true)
 |-- driver: long (nullable = true)
 |-- social_media_id: long (nullable = true)

root
 |-- rank: long (nullable = true)
 |-- starts: long (nullable = true)
 |-- points: long (nullable = true)
 |-- wins: long (nullable = true)
 |-- stage_wins: string (nullable = true)
 |-- poles: long (nullable = true)
 |-- top_5: long (nullable = true)
 |-- top_10: long (nullable = true)
 |-- top_15: long (nullable = true)
 |-- top_20: long (nullable = true)
 |-- race_id: long (nullable = true)
 |-- seasonal_achievement_id: lon

In [47]:
#change data type
def change_data_type(df, col, type_):
    return df.withColumn(col,  df[col].cast(type_))

In [48]:
personal_info_df = change_data_type(personal_info_df, 'birthday', 'timestamp')
personal_info_df = change_data_type(personal_info_df, 'height', 'double')
personal_info_df = change_data_type(personal_info_df, 'weight', 'double')

seasonal_achievement_df = change_data_type(seasonal_achievement_df, 'stage_wins', 'double')

detailed_performance_df = change_data_type(detailed_performance_df, 'chase_bonus', 'double')
detailed_performance_df = change_data_type(detailed_performance_df, 'chase_stage_wins', 'double')
detailed_performance_df = change_data_type(detailed_performance_df, 'chase_wins', 'double')
detailed_performance_df = change_data_type(detailed_performance_df, 'behind', 'double')

In [49]:
# convert all data frames into parquet files
personal_info_df.write.parquet("/project/parquet_files/personal_info.parquet", mode = 'overwrite')
social_media_df.write.parquet("/project/parquet_files/social_media.parquet", mode = 'overwrite')
seasonal_achievement_df.write.parquet("/project/parquet_files/seasonal_achievement.parquet", mode = 'overwrite')
detailed_performance_df.write.parquet("/project/parquet_files/detailed_performance.parquet", mode = 'overwrite')
earnings_df.write.parquet("/project/parquet_files/earnings.parquet", mode = 'overwrite')
debut_info_df.write.parquet("/project/parquet_files/debut_info.parquet", mode = 'overwrite')
race_info_df.write.parquet("/project/parquet_files/race_info.parquet", mode = 'overwrite')
status_info_df.write.parquet("/project/parquet_files/status_info.parquet", mode = 'overwrite')

**Write data into the database**

In [51]:
from configparser import ConfigParser

#Read config.ini file
config_object = ConfigParser()
config_object.read("config.ini")

#Get the password
userinfo = config_object["USERINFO"]

In [52]:
# read all the tables
personal_info_df = spark.read.parquet("/project/parquet_files/personal_info.parquet")
social_media_df = spark.read.parquet("/project/parquet_files/social_media.parquet")
seasonal_achievement_df = spark.read.parquet("/project/parquet_files/seasonal_achievement.parquet")
detailed_performance_df = spark.read.parquet("/project/parquet_files/detailed_performance.parquet")
earnings_df = spark.read.parquet("/project/parquet_files/earnings.parquet")
debut_info_df = spark.read.parquet("/project/parquet_files/debut_info.parquet")
race_info_df = spark.read.parquet("/project/parquet_files/race_info.parquet")
status_info_df = spark.read.parquet("/project/parquet_files/status_info.parquet")

In [53]:
# information for log into postgresql
postgres_uri = userinfo['postgres_uri']
user = userinfo['user']
password = userinfo['password']

# write each table into the database
personal_info_df.write.jdbc(url=postgres_uri, table="nascar.personal_info", mode="append", 
                            properties={"user":user, "password": password, "driver": "org.postgresql.Driver" })
race_info_df.write.jdbc(url=postgres_uri, table="nascar.race_info", mode="append", 
                        properties={"user":user, "password": password, "driver": "org.postgresql.Driver" })
social_media_df.write.jdbc(url=postgres_uri, table="nascar.social_media", mode="append", 
                           properties={"user":user, "password": password, "driver": "org.postgresql.Driver" })
seasonal_achievement_df.write.jdbc(url=postgres_uri, table="nascar.seasonal_achievement", mode="append", 
                                   properties={"user":user, "password": password, "driver": "org.postgresql.Driver" })
detailed_performance_df.write.jdbc(url=postgres_uri, table="nascar.detailed_performance", mode="append", 
                                   properties={"user":user, "password": password, "driver": "org.postgresql.Driver" })
earnings_df.write.jdbc(url=postgres_uri, table="nascar.earnings", mode="append", 
                       properties={"user":user, "password": password, "driver": "org.postgresql.Driver" })
debut_info_df.write.jdbc(url=postgres_uri, table="nascar.debut_info", mode="append", 
                         properties={"user":user, "password": password, "driver": "org.postgresql.Driver" })
status_info_df.write.jdbc(url=postgres_uri, table="nascar.status_info", mode="append", 
                          properties={"user":user, "password": password, "driver": "org.postgresql.Driver" })