In [1]:
import pandas as pd 

In [2]:
import findspark

findspark.find()

'C:\\Users\\Alex Garriga\\AppData\\Local\\Programs\\Python\\Python310\\lib\\site-packages\\pyspark'

In [3]:
findspark.init('C:\\Users\\Alex Garriga\\AppData\\Local\\Programs\\Python\\Python310\\lib\\site-packages\\pyspark')

In [4]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.master("local").appName("Spark 1").getOrCreate()

In [6]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

In [7]:
circuits_schema = StructType(fields=[StructField("circuitId", IntegerType(), False),
                                     StructField("circuitRef", StringType(), True),
                                     StructField("name", StringType(), True),
                                     StructField("location", StringType(), True),
                                     StructField("country", StringType(), True),
                                     StructField("lat", DoubleType(), True),
                                     StructField("lng", DoubleType(), True),
                                     StructField("alt", IntegerType(), True),
                                     StructField("url", StringType(), True)
])

In [8]:
circuits_df = spark.read.option("header",True).schema(circuits_schema).csv("C:/Users/Alex Garriga/Downloads/f1db_csv/circuits.csv")

In [9]:
type(circuits_df)

pyspark.sql.dataframe.DataFrame

In [10]:
circuits_df.show()

+---------+--------------+--------------------+------------+---------+--------+---------+---+--------------------+
|circuitId|    circuitRef|                name|    location|  country|     lat|      lng|alt|                 url|
+---------+--------------+--------------------+------------+---------+--------+---------+---+--------------------+
|        1|   albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|  144.968| 10|http://en.wikiped...|
|        2|        sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|  101.738| 18|http://en.wikiped...|
|        3|       bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|  50.5106|  7|http://en.wikiped...|
|        4|     catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|  2.26111|109|http://en.wikiped...|
|        5|      istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517|   29.405|130|http://en.wikiped...|
|        6|        monaco|   Circuit de Monaco| Monte-Carlo|   Monaco| 43.7347| 

In [11]:
circuits_df.printSchema()

root
 |-- circuitId: integer (nullable = true)
 |-- circuitRef: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- url: string (nullable = true)



In [12]:
circuits_df.describe().show()

+-------+------------------+----------+-------+---------+---------+------------------+-----------------+-----------------+--------------------+
|summary|         circuitId|circuitRef|   name| location|  country|               lat|              lng|              alt|                 url|
+-------+------------------+----------+-------+---------+---------+------------------+-----------------+-----------------+--------------------+
|  count|                79|        79|     79|       79|       79|                79|               79|               77|                  79|
|   mean|              40.0|      null|   null|     null|     null|   33.519034556962|3.096167088607595|247.4935064935065|                null|
| stddev|22.949219304078007|      null|   null|     null|     null|22.625426873911685|64.96320431281227|363.2672505910991|                null|
|    min|                 1|       BAK|A1-Ring|Abu Dhabi|Argentina|          -37.8497|         -118.189|               -7|http://en.wiki

In [13]:
from pyspark.sql.functions import col

In [14]:
circuits_selected_df = circuits_df.select(col("circuitId"), col("circuitRef"), col("name"), col("location"), col("country"), col("lat"), col("lng"), col("alt"))

In [15]:
display(circuits_selected_df)

DataFrame[circuitId: int, circuitRef: string, name: string, location: string, country: string, lat: double, lng: double, alt: int]

In [16]:
circuits_selected_df.show()

+---------+--------------+--------------------+------------+---------+--------+---------+---+
|circuitId|    circuitRef|                name|    location|  country|     lat|      lng|alt|
+---------+--------------+--------------------+------------+---------+--------+---------+---+
|        1|   albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|  144.968| 10|
|        2|        sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|  101.738| 18|
|        3|       bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|  50.5106|  7|
|        4|     catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|  2.26111|109|
|        5|      istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517|   29.405|130|
|        6|        monaco|   Circuit de Monaco| Monte-Carlo|   Monaco| 43.7347|  7.42056|  7|
|        7|    villeneuve|Circuit Gilles Vi...|    Montreal|   Canada|    45.5| -73.5228| 13|
|        8|   magny_cours|Circuit de Nevers...| Magny Cours|

In [17]:
#Renaming Columns
circuits_renamed_df = circuits_selected_df.withColumnRenamed("circuitId", "circuit_id") \
.withColumnRenamed("circuitRef", "circuit_ref") \
.withColumnRenamed("lat", "latitude") \
.withColumnRenamed("lng", "longitude") \
.withColumnRenamed("alt", "altitude") 

In [18]:
circuits_renamed_df.show()

+----------+--------------+--------------------+------------+---------+--------+---------+--------+
|circuit_id|   circuit_ref|                name|    location|  country|latitude|longitude|altitude|
+----------+--------------+--------------------+------------+---------+--------+---------+--------+
|         1|   albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|  144.968|      10|
|         2|        sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|  101.738|      18|
|         3|       bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|  50.5106|       7|
|         4|     catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|  2.26111|     109|
|         5|      istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517|   29.405|     130|
|         6|        monaco|   Circuit de Monaco| Monte-Carlo|   Monaco| 43.7347|  7.42056|       7|
|         7|    villeneuve|Circuit Gilles Vi...|    Montreal|   Canada|    45.5| -73.5228|      13|


In [19]:
from pyspark.sql.functions import current_timestamp

In [20]:
circuits_final_df = circuits_renamed_df.withColumn("ingestion_date", current_timestamp()) 

In [21]:
circuits_final_df.show()

+----------+--------------+--------------------+------------+---------+--------+---------+--------+--------------------+
|circuit_id|   circuit_ref|                name|    location|  country|latitude|longitude|altitude|      ingestion_date|
+----------+--------------+--------------------+------------+---------+--------+---------+--------+--------------------+
|         1|   albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|  144.968|      10|2022-06-16 12:16:...|
|         2|        sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|  101.738|      18|2022-06-16 12:16:...|
|         3|       bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|  50.5106|       7|2022-06-16 12:16:...|
|         4|     catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|  2.26111|     109|2022-06-16 12:16:...|
|         5|      istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517|   29.405|     130|2022-06-16 12:16:...|
|         6|        monaco|   Ci

In [27]:
circuits_final_df.write.mode("overwrite").csv("C:/Users/Alex Garriga/OneDrive/Documents/testing/circuits_final_df/csv/circuits.csv")

In [25]:
circuits_final_df.write.mode("overwrite").parquet("C:/Users/Alex Garriga/OneDrive/Documents/testing/circuits_final_df/parquet")

In [50]:
from pyspark.sql.functions import count, countDistinct, sum, max

In [29]:
circuits_final_df.select(count("*")).show()

+--------+
|count(1)|
+--------+
|      79|
+--------+



In [30]:
circuits_final_df.select(countDistinct("country")).show()

+-----------------------+
|count(DISTINCT country)|
+-----------------------+
|                     35|
+-----------------------+



In [31]:
circuits_final_df.select(sum("altitude")).show()

+-------------+
|sum(altitude)|
+-------------+
|        19057|
+-------------+



In [32]:
circuits_final_df.filter("circuit_ref='villeneuve'").select(sum("altitude")).show()

+-------------+
|sum(altitude)|
+-------------+
|           13|
+-------------+



In [33]:
circuits_final_df.filter("country='Spain'").select(sum("altitude")).show()

+-------------+
|sum(altitude)|
+-------------+
|          923|
+-------------+



In [51]:
circuits_final_df.groupBy("country") \
.agg(max("altitude").alias("max_altitude"), countDistinct("location").alias("number_of_location")).show()

+----------+------------+------------------+
|   country|max_altitude|number_of_location|
+----------+------------+------------------+
|    Russia|           2|                 1|
|    Sweden|         153|                 1|
| Singapore|          18|                 1|
|  Malaysia|          18|                 1|
|    Turkey|         130|                 1|
|   Germany|         578|                 3|
|    France|         790|                 7|
| Argentina|           8|                 1|
|   Belgium|         401|                 3|
|     Qatar|        null|                 1|
|     India|         194|                 1|
|     China|           5|                 1|
|     Italy|         255|                 4|
|     Spain|         609|                 5|
|   Morocco|          19|                 1|
|    Monaco|           7|                 1|
|       USA|         639|                11|
|    Mexico|        2227|                 1|
|Azerbaijan|          -7|                 1|
|        U