In [2]:
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib as mpl
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.ml.stat import Correlation


#pd.set_option('display.max_columns', None)

spark = spark = SparkSession \
    .builder \
    .appName("CCproject") \
    .getOrCreate()

In [9]:
circuits_schema= StructType([
    StructField("circuitId", IntegerType(), True),
    StructField("circuitRef", StringType(), True),
    StructField("name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("country", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lng", DoubleType(), True),
    StructField("alt", IntegerType(), True),
    StructField("url", StringType(), True)
])
    
drivers_schema= StructType([
    StructField("driverId", IntegerType(), True),
    StructField("driverRef", StringType(), True),
    StructField("number", IntegerType(), True),
    StructField("code", StringType(), True),
    StructField("forename", StringType(), True),
    StructField("surname", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("nationality", StringType(), True),
    StructField("url", StringType(), True)
])

driver_standings_schema= StructType([
    StructField("driverStandingsId", IntegerType(), True),
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("points", DoubleType(), True),
    StructField("position", IntegerType(), True),
    StructField("positionText", StringType(), True),
    StructField("wins", IntegerType(), True)
])

quali_schema= StructType([
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("stop", IntegerType(), True),
    StructField("lap", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("duration", StringType(), True),
    StructField("milliseconds", IntegerType(), True)
])

pitstops_schema= StructType([
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("stop", IntegerType(), True),
    StructField("lap", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("duration", StringType(), True),
    StructField("milliseconds", IntegerType(), True)
])

races_schema= StructType([
    StructField("raceId", IntegerType(), True),
    StructField("year", IntegerType(), True),
    StructField("round", IntegerType(), True),
    StructField("circuitId", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("date", DateType(), True),
    StructField("time", StringType(), True),
    StructField("url", StringType(), True),
    StructField("fp1_date", DateType(), True),
    StructField("fp1_time", StringType(), True),
    StructField("fp2_date", DateType(), True),
    StructField("fp2_time", StringType(), True),
    StructField("fp3_date", DateType(), True),
    StructField("fp3_time", StringType(), True),
    StructField("quali_date", DateType(), True),
    StructField("quali_time", StringType(), True),
    StructField("sprint_date", DateType(), True),
    StructField("sprint_time", StringType(), True)
])

results_schema= StructType([
    StructField("resultId", IntegerType(), True),
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("constructorId", IntegerType(), True),
    StructField("number", IntegerType(), True),
    StructField("grid", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("positionText", StringType(), True),
    StructField("positionOrder", IntegerType(), True),
    StructField("points", DoubleType(), True),
    StructField("laps", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True),
    StructField("fastestLap", IntegerType(), True),
    StructField("rank", IntegerType(), True),
    StructField("fastestLapTime", StringType(), True),
    StructField("fastestLapSpeed", StringType(), True),
    StructField("statusId", IntegerType(), True)
])

constructor_standings_schema= StructType([
    StructField("constructorStandingsId", IntegerType(), True),
    StructField("raceId", IntegerType(), True),
    StructField("constructorId", IntegerType(), True),
    StructField("points", DoubleType(), True),
    StructField("position", IntegerType(), True),
    StructField("positionText", StringType(), True),
    StructField("wins", IntegerType(), True)
])

constructor_results_schema= StructType([
    StructField("constructorResultsId", IntegerType(), True),
    StructField("raceId", IntegerType(), True),
    StructField("constructorId", IntegerType(), True),
    StructField("points", IntegerType(), True),
    StructField("status", StringType(), True)
])

constructors_schema= StructType([
    StructField("constructorId", IntegerType(), True),
    StructField("constructorRef", StringType(), True),
    StructField("name", StringType(), True),
    StructField("nationality", StringType(), True),
    StructField("url", StringType(), True)
])

status_schema= StructType([
    StructField("statusId", IntegerType(), True),
    StructField("status", StringType(), True)
])

lap_times_schema= StructType([
    StructField("raceId", IntegerType(), True),
    StructField("driverId", IntegerType(), True),
    StructField("lap", IntegerType(), True),
    StructField("position", IntegerType(), True),
    StructField("time", StringType(), True),
    StructField("milliseconds", IntegerType(), True)
])

In [10]:
circuits= spark.read.csv("hdfs://namenode:8020/f1data/circuits.csv", header=True, schema=circuits_schema)
drivers= spark.read.csv("hdfs://namenode:8020/f1data/drivers.csv", header=True, schema=drivers_schema)
driver_standings= spark.read.csv("hdfs://namenode:8020/f1data/driver_standings.csv", header=True, schema=driver_standings_schema)
quali= spark.read.csv("hdfs://namenode:8020/f1data/qualifying.csv", header=True, schema=quali_schema)
pitstops= spark.read.csv("hdfs://namenode:8020/f1data/pit_stops.csv", header=True, schema=pitstops_schema)
races= spark.read.csv("hdfs://namenode:8020/f1data/races.csv", header=True, schema=races_schema)
results= spark.read.csv("hdfs://namenode:8020/f1data/results.csv", header=True, schema=results_schema)
constructor_standings= spark.read.csv("hdfs://namenode:8020/f1data/constructor_standings.csv", header=True, schema=constructor_standings_schema)
constructor_results= spark.read.csv("hdfs://namenode:8020/f1data/constructor_results.csv", header=True, schema=constructor_results_schema)
constructors= spark.read.csv("hdfs://namenode:8020/f1data/constructors.csv", header=True, schema=constructors_schema)
status= spark.read.csv("hdfs://namenode:8020/f1data/status.csv", header=True, schema=status_schema)
laptimes= spark.read.csv("hdfs://namenode:8020/f1data/lap_times.csv", header=True, schema=lap_times_schema)