In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import col

In [2]:
conf = SparkConf()  # create the configuration
conf.set("spark.jars", "./*.jar")  # set the spark.jars

spark = SparkSession.builder \
        .config(conf=conf) \
        .master("local") \
        .appName("ETL_F1_data") \
        .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

# Collecting raw data from mysql

## circuits

In [3]:
circuits_df = sqlContext.read.format('jdbc').options(
          url='jdbc:mysql://mysql:3306/raw_data',
          driver='com.mysql.jdbc.Driver',
          dbtable='circuits',
          user='root',
          password='mysecretpassword').load()
ci = circuits_df.alias("cir")

In [4]:
ci.show()

+---------+--------------+--------------------+------------+---------+--------+---------+----+--------------------+
|circuitId|    circuitRef|                name|    location|  country|     lat|      lng| alt|                 url|
+---------+--------------+--------------------+------------+---------+--------+---------+----+--------------------+
|        1|   albert_park|Albert Park Grand...|   Melbourne|Australia|-37.8497|  144.968|  10|http://en.wikiped...|
|        2|        sepang|Sepang Internatio...|Kuala Lumpur| Malaysia| 2.76083|  101.738|null|http://en.wikiped...|
|        3|       bahrain|Bahrain Internati...|      Sakhir|  Bahrain| 26.0325|  50.5106|null|http://en.wikiped...|
|        4|     catalunya|Circuit de Barcel...|    Montmeló|    Spain|   41.57|  2.26111|null|http://en.wikiped...|
|        5|      istanbul|       Istanbul Park|    Istanbul|   Turkey| 40.9517|   29.405|null|http://en.wikiped...|
|        6|        monaco|   Circuit de Monaco| Monte-Carlo|   Monaco| 4

## constructors

In [5]:
constructors_df = sqlContext.read.format('jdbc').options(
          url='jdbc:mysql://mysql:3306/raw_data',
          driver='com.mysql.jdbc.Driver',
          dbtable='constructors',
          user='root',
          password='mysecretpassword').load()
co = constructors_df.alias("co")

In [6]:
co.show()

+-------------+--------------+-----------+-----------+--------------------+
|constructorId|constructorRef|       name|nationality|                 url|
+-------------+--------------+-----------+-----------+--------------------+
|            1|       mclaren|    McLaren|    British|http://en.wikiped...|
|            2|    bmw_sauber| BMW Sauber|     German|http://en.wikiped...|
|            3|      williams|   Williams|    British|http://en.wikiped...|
|            4|       renault|    Renault|     French|http://en.wikiped...|
|            5|    toro_rosso| Toro Rosso|    Italian|http://en.wikiped...|
|            6|       ferrari|    Ferrari|    Italian|http://en.wikiped...|
|            7|        toyota|     Toyota|   Japanese|http://en.wikiped...|
|            8|   super_aguri|Super Aguri|   Japanese|http://en.wikiped...|
|            9|      red_bull|   Red Bull|   Austrian|http://en.wikiped...|
|           10|   force_india|Force India|     Indian|http://en.wikiped...|
|           

## drivers

In [7]:
drivers_df = sqlContext.read.format('jdbc').options(
          url='jdbc:mysql://mysql:3306/raw_data',
          driver='com.mysql.jdbc.Driver',
          dbtable='drivers',
          user='root',
          password='mysecretpassword').load()
dr = drivers_df.alias("dr")

In [8]:
dr.show()

+--------+----------+------+----+---------+----------+----------+-----------+--------------------+
|driverId| driverRef|number|code| forename|   surname|       dob|nationality|                 url|
+--------+----------+------+----+---------+----------+----------+-----------+--------------------+
|       1|  hamilton|    44| HAM|    Lewis|  Hamilton|1985-01-07|    British|http://en.wikiped...|
|       2|  heidfeld|  null| HEI|     Nick|  Heidfeld|1977-05-10|     German|http://en.wikiped...|
|       3|   rosberg|     6| ROS|     Nico|   Rosberg|1985-06-27|     German|http://en.wikiped...|
|       4|    alonso|    14| ALO| Fernando|    Alonso|1981-07-29|    Spanish|http://en.wikiped...|
|       5|kovalainen|  null| KOV|   Heikki|Kovalainen|1981-10-19|    Finnish|http://en.wikiped...|
|       6|  nakajima|  null| NAK|   Kazuki|  Nakajima|1985-01-11|   Japanese|http://en.wikiped...|
|       7|  bourdais|  null| BOU|Sébastien|  Bourdais|1979-02-28|     French|http://en.wikiped...|
|       8|

## lapTimes

In [9]:
lapTimes_df = sqlContext.read.format('jdbc').options(
          url='jdbc:mysql://mysql:3306/raw_data',
          driver='com.mysql.jdbc.Driver',
          dbtable='lapTimes',
          user='root',
          password='mysecretpassword').load()
lT = lapTimes_df.alias("lT")

In [10]:
lT.show()

+------+--------+---+--------+--------+------------+
|raceId|driverId|lap|position|    time|milliseconds|
+------+--------+---+--------+--------+------------+
|   841|      20|  1|       1|1:38.109|       98109|
|   841|      20|  2|       1|1:33.006|       93006|
|   841|      20|  3|       1|1:32.713|       92713|
|   841|      20|  4|       1|1:32.803|       92803|
|   841|      20|  5|       1|1:32.342|       92342|
|   841|      20|  6|       1|1:32.605|       92605|
|   841|      20|  7|       1|1:32.502|       92502|
|   841|      20|  8|       1|1:32.537|       92537|
|   841|      20|  9|       1|1:33.240|       93240|
|   841|      20| 10|       1|1:32.572|       92572|
|   841|      20| 11|       1|1:32.669|       92669|
|   841|      20| 12|       1|1:32.902|       92902|
|   841|      20| 13|       1|1:33.698|       93698|
|   841|      20| 14|       3|1:52.075|      112075|
|   841|      20| 15|       4|1:38.385|       98385|
|   841|      20| 16|       2|1:31.548|       

## races

In [11]:
races_df = sqlContext.read.format('jdbc').options(
          url='jdbc:mysql://mysql:3306/raw_data',
          driver='com.mysql.jdbc.Driver',
          dbtable='races',
          user='root',
          password='mysecretpassword').load()
ra = races_df.alias("ra")

In [12]:
ra.show()

+------+----+-----+---------+--------------------+----------+-------------------+--------------------+
|raceId|year|round|circuitId|                name|      date|               time|                 url|
+------+----+-----+---------+--------------------+----------+-------------------+--------------------+
|     1|2009|    1|        1|Australian Grand ...|2009-03-29|1970-01-01 06:00:00|http://en.wikiped...|
|     2|2009|    2|        2|Malaysian Grand Prix|2009-04-05|1970-01-01 09:00:00|http://en.wikiped...|
|     3|2009|    3|       17|  Chinese Grand Prix|2009-04-19|1970-01-01 07:00:00|http://en.wikiped...|
|     4|2009|    4|        3|  Bahrain Grand Prix|2009-04-26|1970-01-01 12:00:00|http://en.wikiped...|
|     5|2009|    5|        4|  Spanish Grand Prix|2009-05-10|1970-01-01 12:00:00|http://en.wikiped...|
|     6|2009|    6|        6|   Monaco Grand Prix|2009-05-24|1970-01-01 12:00:00|http://en.wikiped...|
|     7|2009|    7|        5|  Turkish Grand Prix|2009-06-07|1970-01-01 1

## results

In [13]:
results_df = sqlContext.read.format('jdbc').options(
          url='jdbc:mysql://mysql:3306/raw_data',
          driver='com.mysql.jdbc.Driver',
          dbtable='results',
          user='root',
          password='mysecretpassword').load()
re = results_df.alias("re")

In [14]:
re.show()

+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+
|resultId|raceId|driverId|constructorId|number|grid|position|positionText|positionOrder|points|laps|       time|milliseconds|fastestLap|rank|fastestLapTime|fastestLapSpeed|statusId|
+--------+------+--------+-------------+------+----+--------+------------+-------------+------+----+-----------+------------+----------+----+--------------+---------------+--------+
|       1|    18|       1|            1|    22|   1|       1|           1|            1|  10.0|  58|1:34:50.616|     5690616|        39|   2|      1:27.452|        218.300|       1|
|       2|    18|       2|            2|     3|   5|       2|           2|            2|   8.0|  58|     +5.478|     5696094|        41|   3|      1:27.739|        217.586|       1|
|       3|    18|       3|            3|     7|   7|       3|           3|            3|  

## status

In [15]:
status_df = sqlContext.read.format('jdbc').options(
          url='jdbc:mysql://mysql:3306/raw_data',
          driver='com.mysql.jdbc.Driver',
          dbtable='status',
          user='root',
          password='mysecretpassword').load()
st = status_df.alias("st")

In [16]:
st.show()

+--------+------------+
|statusId|      status|
+--------+------------+
|       1|    Finished|
|       2|Disqualified|
|       3|    Accident|
|       4|   Collision|
|       5|      Engine|
|       6|     Gearbox|
|       7|Transmission|
|       8|      Clutch|
|       9|  Hydraulics|
|      10|  Electrical|
|      11|      +1 Lap|
|      12|     +2 Laps|
|      13|     +3 Laps|
|      14|     +4 Laps|
|      15|     +5 Laps|
|      16|     +6 Laps|
|      17|     +7 Laps|
|      18|     +8 Laps|
|      19|     +9 Laps|
|      20|    Spun off|
+--------+------------+
only showing top 20 rows



# Transform data

In [17]:
gp_results_df = re.join(dr, re.driverId == dr.driverId, how="inner") \
    .join(ra, re.raceId == ra.raceId, how="inner") \
    .join(co, re.constructorId == co.constructorId, how="inner") \
    .join(st, re.statusId == st.statusId, how="inner") \
    .where(col("position").isNotNull()) \
    .select(
        ra.year.alias("season"),
        ra.name.alias("grand_prix"),
        re.position.alias("pos_num"),
        re.points,
        dr.driverRef.alias("driver"),
        dr.nationality.alias("driver_nation"),
        co.constructorRef.alias("constructor"),
        co.nationality.alias("constructor_nation"),
        st.status
)
gr = gp_results_df.alias("gr")

In [18]:
gr.show()

+------+--------------------+-------+------+------------------+-------------+---------------+------------------+---------+
|season|          grand_prix|pos_num|points|            driver|driver_nation|    constructor|constructor_nation|   status|
+------+--------------------+-------+------+------------------+-------------+---------------+------------------+---------+
|  2019|   French Grand Prix|     20|   0.0|          grosjean|       French|           haas|          American|  Retired|
|  2018|   German Grand Prix|     16|   0.0|            alonso|      Spanish|        mclaren|           British|  Retired|
|  2012|Singapore Grand Prix|     18|   0.0|       bruno_senna|    Brazilian|       williams|           British|  Retired|
|  2013|Malaysian Grand Prix|     18|   0.0|         ricciardo|   Australian|     toro_rosso|           Italian|  Retired|
|  2012|   Indian Grand Prix|     22|   0.0|michael_schumacher|       German|       mercedes|            German|  Retired|
|  2013|  Britis

In [19]:
df = gp_results_df.groupBy("driver_nation")

In [20]:
df.sum("points").orderBy(col("sum(points)").desc()).show()

+-------------+-----------+
|driver_nation|sum(points)|
+-------------+-----------+
|      British|    8941.14|
|       German|     7810.5|
|      Finnish|     3875.5|
|    Brazilian|     3423.0|
|       French|    3028.33|
|   Australian|     2561.5|
|      Spanish|     2248.0|
|      Italian|    2030.66|
|     American|      996.0|
|     Austrian|      990.5|
|        Dutch|      975.0|
|    Argentine|     695.92|
|      Mexican|      675.0|
|New Zealander|      541.5|
|     Canadian|      409.0|
|      Swedish|      396.0|
|      Belgian|      388.0|
|        Swiss|      348.0|
|   Monegasque|      307.0|
|    Colombian|      307.0|
+-------------+-----------+
only showing top 20 rows



In [21]:
circutis_laps_df = lT.join(dr, lT.driverId == dr.driverId, how="inner") \
    .join(ra, lT.raceId == ra.raceId, how="inner") \
    .join(ci, ra.circuitId == ci.circuitId, how="inner") \
    .where(col("milliseconds").isNotNull()) \
    .select(
        ci.circuitRef.alias("circuit"),
        ra.year.alias("season"),
        dr.driverRef.alias("driver"),
        lT.time.alias("time_str"),
        lT.milliseconds.alias("time_ms")
)
cl = circutis_laps_df.alias("cl")

In [22]:
cl.show()

+-------+------+--------------+--------+-------+
|circuit|season|        driver|time_str|time_ms|
+-------+------+--------------+--------+-------+
| ricard|  2019|max_verstappen|1:45.972| 105972|
| ricard|  2019|max_verstappen|1:37.252|  97252|
| ricard|  2019|max_verstappen|1:37.465|  97465|
| ricard|  2019|max_verstappen|1:37.105|  97105|
| ricard|  2019|max_verstappen|1:36.865|  96865|
| ricard|  2019|max_verstappen|1:36.427|  96427|
| ricard|  2019|max_verstappen|1:36.522|  96522|
| ricard|  2019|max_verstappen|1:36.618|  96618|
| ricard|  2019|max_verstappen|1:36.265|  96265|
| ricard|  2019|max_verstappen|1:36.353|  96353|
| ricard|  2019|max_verstappen|1:36.039|  96039|
| ricard|  2019|max_verstappen|1:36.009|  96009|
| ricard|  2019|max_verstappen|1:36.075|  96075|
| ricard|  2019|max_verstappen|1:35.876|  95876|
| ricard|  2019|max_verstappen|1:36.011|  96011|
| ricard|  2019|max_verstappen|1:36.038|  96038|
| ricard|  2019|max_verstappen|1:36.272|  96272|
| ricard|  2019|max_

# Write data to the Vertica

In [23]:
opts_gr={
    "table": "gp_results",
    "db": "docker",
    "user": "dbadmin",
    "password": "",
    "host": "vertica"
}

In [24]:
gr.write.save(format="com.vertica.spark.datasource.DefaultSource", mode="overwrite", **opts_gr)

Py4JJavaError: An error occurred while calling o205.save.
: java.lang.Exception: ERROR: S2V.save(): did not pass the Vertica requirements pre-check.  The following problems were encountered: hdfs_url scheme should be 'hdfs', but user provided:null. hdfs_url path is not valid, user provided:. java.lang.IllegalArgumentException: Can not create a Path from an empty string
	at com.vertica.spark.s2v.S2V.save(S2V.scala:384)
	at com.vertica.spark.datasource.DefaultSource.createRelation(VerticaSource.scala:88)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
