In [1]:
%load_ext autoreload
%autoreload 2

import pandas as pd
import pyspark
import numpy as np
import pyspark.sql.functions as sf
import matplotlib.pyplot as plt
%matplotlib inline


import logging
logging.basicConfig(level=logging.INFO)

# Create spark session
spark = (pyspark.sql.SparkSession.builder.getOrCreate())

sc = spark.sparkContext

In [2]:
from pyspark.sql.window import Window

In [3]:
import pyspark.sql.functions as sf

In [4]:
from pyspark.sql.dataframe import DataFrame

def pipe(df, f, *args, **kwargs):
    return f(df, *args, **kwargs)

DataFrame.pipe = pipe

In [9]:
from formula1 import pyspark_preprocessing

In [10]:
work_df = (
    pyspark_preprocessing.read_data()
    .pipe(pyspark_preprocessing.join_constructors)
    .pipe(pyspark_preprocessing.join_drivers)
    .pipe(pyspark_preprocessing.join_races)
    .pipe(pyspark_preprocessing.join_status)
    .pipe(pyspark_preprocessing.drop_columns)
    .pipe(pyspark_preprocessing.sort_races)
    .pipe(pyspark_preprocessing.average_finishing)
    .pipe(pyspark_preprocessing.average_finishing_percircuit)
    .pipe(pyspark_preprocessing.get_wins)
    .pipe(pyspark_preprocessing.change_dtypes)
    #.pipe(pyspark_preprocessing.get_wins_per_circuit)
)

INFO:formula1.utils:[read_data] rows=23777,  time=0:00:03.529742
INFO:formula1.utils:[join_constructors] rows=23777,  time=0:00:00.284423
INFO:formula1.utils:[join_drivers] rows=23777,  time=0:00:00.249258
INFO:formula1.utils:[join_races] rows=23777,  time=0:00:00.184660
INFO:formula1.utils:[join_status] rows=23777,  time=0:00:00.140507
INFO:formula1.utils:[drop_columns] rows=23777,  time=0:00:00.011408
INFO:formula1.utils:[sort_races] rows=23777,  time=0:00:00.035939
INFO:formula1.utils:[average_finishing] rows=23777,  time=0:00:00.053202
INFO:formula1.utils:[average_finishing_percircuit] rows=23777,  time=0:00:00.024379
INFO:formula1.utils:[get_wins] rows=23777,  time=0:00:00.034750
INFO:formula1.utils:[change_dtypes] rows=23777,  time=0:00:00.135780


In [27]:
work_df.columns

['statusId',
 'raceId',
 'driverId',
 'constructorId',
 'number',
 'grid',
 'position',
 'positionOrder',
 'points',
 'laps',
 'milliseconds',
 'fastestLap',
 'rank',
 'fastestLapTime',
 'fastestLapSpeed',
 'constructorRef',
 'name',
 'nationality',
 'driverRef',
 'number',
 'forename',
 'surname',
 'nationality',
 'year',
 'round',
 'circuitId',
 'name',
 'date',
 'status',
 'mean_position_overall',
 'mean_position_percircuit',
 'win']

In [28]:
work_df.show()

+--------+------+--------+-------------+------+----+--------+-------------+------+----+------------+----------+----+--------------+---------------+--------------+---------+-----------+---------+------+--------+-------+-----------+----+-----+---------+--------------------+-------------------+------------------+---------------------+------------------------+---+
|statusId|raceId|driverId|constructorId|number|grid|position|positionOrder|points|laps|milliseconds|fastestLap|rank|fastestLapTime|fastestLapSpeed|constructorRef|     name|nationality|driverRef|number|forename|surname|nationality|year|round|circuitId|                name|               date|            status|mean_position_overall|mean_position_percircuit|win|
+--------+------+--------+-------------+------+----+--------+-------------+------+----+------------+----------+----+--------------+---------------+--------------+---------+-----------+---------+------+--------+-------+-----------+----+-----+---------+--------------------+--

In [15]:
(work_df.filter((sf.col('driverRef') == 'hamilton') & (sf.col('year') == 2017))
 .select(sf.col('driverRef'), sf.col('year'), sf.col('circuitId'), sf.col('win'))
 .sort(sf.col('raceId'))
 .show()
)

+---------+----+---------+-----+
|driverRef|year|circuitId|  win|
+---------+----+---------+-----+
| hamilton|2017|        1|false|
| hamilton|2017|       17| true|
| hamilton|2017|        3|false|
| hamilton|2017|       71|false|
| hamilton|2017|        4| true|
| hamilton|2017|        6|false|
| hamilton|2017|        7| true|
| hamilton|2017|       73|false|
| hamilton|2017|       70|false|
| hamilton|2017|        9| true|
| hamilton|2017|       11|false|
| hamilton|2017|       13| true|
| hamilton|2017|       14| true|
| hamilton|2017|       15| true|
| hamilton|2017|        2|false|
| hamilton|2017|       22| true|
| hamilton|2017|       69| true|
| hamilton|2017|       32|false|
| hamilton|2017|       18|false|
| hamilton|2017|       24|false|
+---------+----+---------+-----+



In [30]:
work_df.select('win').show()

+---+
|win|
+---+
|  1|
|  0|
|  0|
|  0|
|  0|
|  0|
|  0|
|  0|
|  0|
|  0|
|  0|
|  0|
|  0|
|  0|
|  0|
|  0|
|  0|
|  0|
|  0|
|  0|
+---+
only showing top 20 rows



In [52]:
work_df.groupby('driverRef').agg(sf.sum(work_df.win)).show()

+------------+--------+
|   driverRef|sum(win)|
+------------+--------+
|       godia|       0|
|      pollet|       0|
|       hobbs|       0|
| karthikeyan|       0|
|  scarfiotti|       1|
|       irwin|       0|
|     angelis|       2|
|      alliot|       0|
|   magnussen|       0|
|      rosset|       0|
|    flaherty|       1|
|  stuppacher|       0|
|       terra|       0|
|    macdowel|       0|
|   hurtubise|       0|
|    blignaut|       0|
| alguersuari|       0|
|      holmes|       0|
|    cantrell|       0|
|lucienbonnet|       0|
+------------+--------+
only showing top 20 rows



In [None]:
val cumSum = sampleData.withColumn("cumulativeSum", sum(sampleData("Salary"))
             .over( Window.partitionBy("Role").orderBy("Salary")))

In [13]:
window = Window.partitionBy('driverId', 'circuitId').orderBy('raceId')


In [46]:
(work_df.groupby('driverRef')
 .sum('win')
 .filter(work_df.driverRef == 'hamilton')
 .show()
)

+---------+--------+
|driverRef|sum(win)|
+---------+--------+
| hamilton|      62|
+---------+--------+



In [41]:
work_df.printSchema()

root
 |-- statusId: integer (nullable = true)
 |-- raceId: integer (nullable = true)
 |-- driverId: integer (nullable = true)
 |-- constructorId: integer (nullable = true)
 |-- number: integer (nullable = true)
 |-- grid: integer (nullable = true)
 |-- position: integer (nullable = true)
 |-- positionOrder: integer (nullable = true)
 |-- points: double (nullable = true)
 |-- laps: integer (nullable = true)
 |-- milliseconds: integer (nullable = true)
 |-- fastestLap: integer (nullable = true)
 |-- rank: integer (nullable = true)
 |-- fastestLapTime: string (nullable = true)
 |-- fastestLapSpeed: string (nullable = true)
 |-- constructorRef: string (nullable = true)
 |-- name: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- driverRef: string (nullable = true)
 |-- number: integer (nullable = true)
 |-- forename: string (nullable = true)
 |-- surname: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- rou

In [5]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

In [6]:
df.show()

+---+----+
| id|   v|
+---+----+
|  1| 1.0|
|  1| 2.0|
|  2| 3.0|
|  2| 5.0|
|  2|10.0|
+---+----+



In [8]:
df.select(sf.col('v')).show()

+----+
|   v|
+----+
| 1.0|
| 2.0|
| 3.0|
| 5.0|
|10.0|
+----+



In [38]:
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v = v - v.mean())

In [39]:
df.groupby("id").apply(subtract_mean)

DataFrame[id: bigint, v: double]

In [37]:
df.show()

+---+----+
| id|   v|
+---+----+
|  1| 1.0|
|  1| 2.0|
|  2| 3.0|
|  2| 5.0|
|  2|10.0|
+---+----+



In [41]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

#df.groupby("id").apply(subtract_mean).show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

In [42]:
df.show()

+---+----+
| id|   v|
+---+----+
|  1| 1.0|
|  1| 2.0|
|  2| 3.0|
|  2| 5.0|
|  2|10.0|
+---+----+



In [45]:
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def subtract_mean(pdf):
    return pdf.assign(v=pdf.v - pdf.v.mean())

df.groupby('id').apply(subtract_mean)

DataFrame[id: bigint, v: double]

In [44]:
df.show()

+---+----+
| id|   v|
+---+----+
|  1| 1.0|
|  1| 2.0|
|  2| 3.0|
|  2| 5.0|
|  2|10.0|
+---+----+



In [None]:
@log_step
def get_wins_per_circuit(df):
    return df.assign(
        win_per_circuit = lambda df: df.groupby(['driverId','circuitId'])['win']
        .transform(lambda df: df.shift(1).cumsum())
    )

In [16]:
def get_wins_per_circuit(df):
    window = Window.partitionBy('driverId', 'circuitId').orderBy('raceId')
    
    return (df.withColumn('win_per_circuit')
            , sum(df('win'))
            .over(window)
            )  

In [12]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").apply(subtract_mean).show()

Py4JJavaError: An error occurred while calling o53.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 44 in stage 7.0 failed 1 times, most recent failure: Lost task 44.0 in stage 7.0 (TID 136, localhost, executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:486)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:475)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:178)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:159)
	... 26 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:486)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:475)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:178)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:159)
	... 26 more


In [82]:
work_df.withColumn('win', sf.col('positionOrder') == 1)

DataFrame[statusId: int, raceId: int, driverId: int, constructorId: int, number: int, grid: int, position: int, positionOrder: int, points: double, laps: int, milliseconds: int, fastestLap: int, rank: int, fastestLapTime: string, fastestLapSpeed: string, constructorRef: string, name: string, nationality: string, driverRef: string, number: int, forename: string, surname: string, nationality: string, year: int, round: int, circuitId: int, name: string, date: timestamp, status: string, mean_position_overall: double, mean_position_percircuit: double, win: boolean]

In [78]:
def get_wins(df):
    return df.withColumn('win', sf.col('positionOrder') == 1)

In [None]:
df.filter(df.X == 1).toPandas()

In [None]:
def average_finishing(df):
    window = Window.partitionBy('driverId')

    return df.withColumn('mean_position_overall', sf.avg('positionOrder').over(window))

In [70]:
def demean_circuit_from_mean(df):
    window = Window.partitionBy('driverId', 'circuitId')
    
    (df.withColumn('demean_circuit'),
     sf.col('mean_position_overall') - sf.avg('positionOrder').over(window)
    )

In [None]:
# (airlines
#  .filter(~sf.isnan(sf.col('arr_delay')))
#  .withColumn("demean_arry_delay", 
#              sf.col('arr_delay') - sf.avg('arr_delay').over(window))
#  .toPandas()
# )

In [69]:
(work_df.groupby(sf.col('driverRef'), sf.col('circuitId')).agg(sf.mean(work_df.positionOrder))
 .filter(sf.col('driverRef') == 'hamilton')
 .withColumn('demean_position', sf.col('avg(positionOrder)') - )
 .toPandas()
)

Unnamed: 0,driverRef,circuitId,avg(positionOrder),demean_position
0,hamilton,13,9.0,18.0
1,hamilton,6,4.363636,8.727273
2,hamilton,70,2.25,4.5
3,hamilton,11,4.272727,8.545455
4,hamilton,18,7.181818,14.363636
5,hamilton,12,5.8,11.6
6,hamilton,24,6.222222,12.444444
7,hamilton,35,4.75,9.5
8,hamilton,16,6.5,13.0
9,hamilton,20,8.25,16.5


In [None]:
#df.filter(df.X == 1).toPandas()

In [51]:
(work_df.filter(work_df.driverRef == 'hamilton')
 .select('driverRef', 'positionOrder')
 .withColumn()
)

Unnamed: 0,driverRef,positionOrder
0,hamilton,3
1,hamilton,2
2,hamilton,2
3,hamilton,2
4,hamilton,2
5,hamilton,1
6,hamilton,1
7,hamilton,3
8,hamilton,3
9,hamilton,9


In [39]:
work_df.groupby('driverId').sum('positionOrder').show()

+--------+------------------+
|driverId|sum(positionOrder)|
+--------+------------------+
|     496|               126|
|     463|                73|
|     471|                 9|
|     148|               608|
|     833|               204|
|     737|                14|
|     623|                20|
|     540|                42|
|     392|                30|
|     243|               897|
|     516|                75|
|      31|               810|
|     580|               157|
|     451|                22|
|     251|               367|
|     137|              2075|
|      85|               539|
|     808|               833|
|     458|                21|
|      65|              2129|
+--------+------------------+
only showing top 20 rows



In [36]:
work_df.groupby('driverRef').agg(sf.min(work_df.mean_position_overall)).toPandas()

Unnamed: 0,driverRef,min(mean_position_overall)
0,godia,11.214286
1,hobbs,9.000000
2,pollet,13.200000
3,karthikeyan,18.458333
4,scarfiotti,11.846154
5,magnussen,15.400000
6,irwin,11.100000
7,alliot,16.547826
8,angelis,11.981818
9,rosset,15.266667


In [6]:
work_df.columns

['statusId',
 'raceId',
 'driverId',
 'constructorId',
 'number',
 'grid',
 'position',
 'positionOrder',
 'points',
 'laps',
 'milliseconds',
 'fastestLap',
 'rank',
 'fastestLapTime',
 'fastestLapSpeed',
 'constructorRef',
 'name',
 'nationality',
 'driverRef',
 'number',
 'forename',
 'surname',
 'nationality',
 'year',
 'round',
 'circuitId',
 'name',
 'date',
 'status']

In [None]:
@log_step
def average_finishing(df):
    return df.assign(
        mean_position_till_date = lambda df: df.groupby('driverId')['positionOrder']
        .transform(lambda df: df.shift(1).expanding().mean())
    )

In [7]:
from pyspark.sql.window import Window

In [23]:
#pyspark
def average_finishing(df):
    window = Window.partitionBy('driverId')
    
    return df.withColumn('mean_position_overall', sf.avg('positionOrder').over(window))

In [8]:
def diff_between_races(df):
    window = Window.partitionBy('driverId').orderBy('raceId')
    
    return df.withColumn('difference_races', sf.col('positionOrder') - sf.lag(sf.col('positionOrder')).over(window))

In [9]:
x = diff_between_races(work_df)

In [16]:
#x.select('difference_races','driverRef').show()

(x.filter(x.driverRef == 'hamilton')
 .select('difference_races','driverRef').describe().show()
)

+-------+--------------------+---------+
|summary|    difference_races|driverRef|
+-------+--------------------+---------+
|  count|                 207|      208|
|   mean|-0.08695652173913043|     null|
| stddev|   8.773424894623286|     null|
|    min|                 -23| hamilton|
|    max|                  23| hamilton|
+-------+--------------------+---------+



In [18]:
window = Window.partitionBy('driverId').orderBy('raceId')

In [None]:
work_df.select('')

In [184]:
def average_finishing(df):
    window =  Window.partitionBy('driverId')
    
    return df.withColumn('mean_position_till_date', sf.avg('positionOrder').over(window))

In [None]:
sf.lag('temperature').over(mid_window))

In [183]:
work_df.show()

+--------+------+--------+-------------+------+----+--------+-------------+------+----+------------+----------+----+--------------+---------------+--------------+-----------+-----------+--------------+------+--------+--------------+-----------+----+-----+---------+------------------+-------------------+--------------+
|statusId|raceId|driverId|constructorId|number|grid|position|positionOrder|points|laps|milliseconds|fastestLap|rank|fastestLapTime|fastestLapSpeed|constructorRef|       name|nationality|     driverRef|number|forename|       surname|nationality|year|round|circuitId|              name|               date|        status|
+--------+------+--------+-------------+------+----+--------+-------------+------+----+------------+----------+----+--------------+---------------+--------------+-----------+-----------+--------------+------+--------+--------------+-----------+----+-----+---------+------------------+-------------------+--------------+
|       1|   833|     642|           51|

In [180]:
def average_finishing_test(df):
    window =  Window.partitionBy('driverId')
    
    return df.withColumn('lagged_by_one', sf.lag('positionOrder').over(window))

In [181]:
x = average_finishing_test(work_df)

AnalysisException: 'Window function lag(positionOrder#6326, 1, null) requires window to be ordered, please add ORDER BY clause. For example SELECT lag(positionOrder#6326, 1, null)(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;'

In [None]:
df.filter(df.X == 1).toPandas()

In [163]:
x.filter(x.driverRef == 'hamilton').select('mean_position').show()

+-----------------+
|    mean_position|
+-----------------+
|5.485576923076923|
|5.485576923076923|
|5.485576923076923|
|5.485576923076923|
|5.485576923076923|
|5.485576923076923|
|5.485576923076923|
|5.485576923076923|
|5.485576923076923|
|5.485576923076923|
|5.485576923076923|
|5.485576923076923|
|5.485576923076923|
|5.485576923076923|
|5.485576923076923|
|5.485576923076923|
|5.485576923076923|
|5.485576923076923|
|5.485576923076923|
|5.485576923076923|
+-----------------+
only showing top 20 rows



In [None]:
@log_step
def average_finishing(df):
    return df.assign(
        mean_position_till_date = lambda df: df.groupby('driverId')['positionOrder']
        .transform(lambda df: df.shift(1).expanding().mean())
    )

# Get the driver with the max driverID

In [38]:
work_df.select('driverRef','driverId').toPandas()

Unnamed: 0,driverRef,driverId
0,hamilton,1
1,heidfeld,2
2,rosberg,3
3,alonso,4
4,kovalainen,5
5,nakajima,6
6,bourdais,7
7,raikkonen,8
8,kubica,9
9,glock,10


In [44]:
max_driverId = work_df.agg({"driverId": "max"}).collect()[0][0]
max_driverId

843

In [70]:
work_df.filter(work_df.driverId == max_driverId).show()

+--------+------+--------+-------------+--------+------+----+--------+------------+-------------+------+----+----+------------+----------+----+--------------+---------------+--------------+----------+-----------+--------------------+----+---------------+------+----+--------+-------+----------+-------------+--------------------+----+-----+---------+--------------------+-------------------+--------+--------------------+------+
|statusId|raceId|driverId|constructorId|resultId|number|grid|position|positionText|positionOrder|points|laps|time|milliseconds|fastestLap|rank|fastestLapTime|fastestLapSpeed|constructorRef|      name|nationality|                 url| _c5|      driverRef|number|code|forename|surname|       dob|  nationality|                 url|year|round|circuitId|                name|               date|    time|                 url|status|
+--------+------+--------+-------------+--------+------+----+--------+------------+-------------+------+----+----+------------+----------+----

In [53]:
max_driver_RDD = work_df.filter(work_df.driverId == max_driverId)

In [69]:
max_driver_RDD.select('driverRef','driverId').collect()[0][0]

'brendon_hartley'