# UDF and Caching Lab

In this lab, we will be working with data about all international soccer games ever played. 

First run the following imports for later use, then read in the data 

In [1]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
21,application_1566055793802_0019,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
from pyspark.sql.types import StructType, StructField, StringType, DateType, IntegerType, BooleanType

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
games = spark.read.csv("hdfs:///data/soccer_games.csv",header=True, inferSchema=True)
games = games.repartition(100)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Vieiw the data using `show`

### Expand some acronyms in the Tournament column using UDFs

We have provided this dictionary to perform look up in below

In [4]:
acronyms = {'UEFA': "Union of European Football Associations",
            "FIFA":"Fédération Internationale de Football Association",
            "AFC":"Asian Football Confederation", 
            "CONCACAF":"Confederation of North, Central American and Caribbean Association Football"
           }

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Write a python function `slow` that takes in one argument, a string
- Split the String into words by splitting on spaces using `split`
- For each word in the string, if it is in the acronyms dictionary, use the value there, otherwise leave it as is
- Return the expanded words joined back together in a single string

In [5]:
def slow(row):
    old = []
    for i in row.split(' '):
        if i in acronyms:
            old.append(acronyms[i])
        else:
            old.append(i)
    return ' '.join(old)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Test your python function below. It should return "Fédération Internationale de Football Association World Cup"

In [6]:
print(slow("FIFA World Cup"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Fédération Internationale de Football Association World Cup

Register your python function as a UDF using `spark.udf.register`

In [7]:
slow_udf = spark.udf.register("expand",slow)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Call your UDF function on the tournament column by using a `select` method on `games`

In [8]:
expanded = games.select(slow_udf('tournament'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Use distinct and show to view the results

In [9]:
expanded.distinct().show(100,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------------------------------------------------------------------------------------+
|expand(tournament)                                                                                    |
+------------------------------------------------------------------------------------------------------+
|United Arab Emirates Friendship Tournament                                                            |
|Balkan Cup                                                                                            |
|CECAFA Cup                                                                                            |
|Nordic Championship                                                                                   |
|Copa Juan Pinto Durán                                                                                 |
|Viva World Cup                                                                                        |
|West African Cup                                      

Now we are going to write the same UDF using Pandas. We have written the python function for you this time

In [10]:
def fast(series):
    return series.str.split().apply(lambda y: ''.join([acronyms.get(x,x) for x in y]))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Make a vectorized UDF using `pandas_udf`

In [11]:
fast_udf = pandas_udf(fast,StringType())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Call your UDF function on the tournament column by using a `select` method on `games`

In [12]:
expanded = games.select(fast_udf('tournament'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Use distinct and show to view the results

In [13]:
expanded.distinct().show(100,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------------------------------------------------------------------------------------------+
|fast(tournament)                                                                                    |
+----------------------------------------------------------------------------------------------------+
|RousCup                                                                                             |
|MerlionCup                                                                                          |
|IndonesiaTournament                                                                                 |
|CopaAméricaqualification                                                                            |
|UDEACCup                                                                                            |
|Fédération Internationale de Football AssociationWorldCupqualification                              |
|WindwardIslandsTournament                                               

### Find the games in which a team scored the most goals, per tournament
We have already written the vectorized Python function for you, see if you can follow what it is doing

In [14]:
def most_goals(df):
    df = df.assign(game_max = df[['home_score','away_score']].max(axis=1))
    most = df.iloc[df.game_max.idxmax()]
    most = most.drop('game_max')
    return most.to_frame().T

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Next we need to create the return type, which will have a list of all the columns and their types.

We've done the first few columns for you

In [15]:
gamesType = StructType([StructField('date',DateType()),
                        StructField('home_team',StringType()),
                        StructField('away_team',StringType()),
                        StructField('home_score',IntegerType()),
                        StructField('away_score',IntegerType()),
                        StructField('tournament',StringType()),
                        StructField('city',StringType()),
                        StructField('country',StringType()),
                        StructField('neutral',IntegerType())
                       ])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Make a GROUPED_MAP Vectorized UDF

In [16]:
most_goals_udf = pandas_udf(most_goals, gamesType, PandasUDFType.GROUPED_MAP )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Use `groupby` and `apply` to determine the game in each tournament with the most goals for one team. 
Use `show` to view the results

In [17]:
games.groupby('tournament').apply(most_goals_udf).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----------+--------------------+----------+----------+--------------------+--------------+--------------------+-------+
|      date|  home_team|           away_team|home_score|away_score|          tournament|          city|             country|neutral|
+----------+-----------+--------------------+----------+----------+--------------------+--------------+--------------------+-------+
|2000-10-07|  Australia|      Korea Republic|         2|         4|United Arab Emira...|         Dubai|United Arab Emirates|      1|
|1948-06-06|    Hungary|             Romania|         9|         0|          Balkan Cup|      Budapest|             Hungary|      0|
|2000-06-25|Netherlands|              Serbia|         6|         1|           UEFA Euro|     Rotterdam|         Netherlands|      0|
|2001-12-09|     Uganda|            Djibouti|        10|         1|          CECAFA Cup|        Kigali|              Rwanda|      1|
|1954-08-15|    Finland|              Sweden|         1|        10| N

## Caching and Repartioning

In [18]:
from pyspark.sql.functions import year

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
games = games.withColumn('date', year(games.date))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

First we are going to run a `groupby` on the data as is 

In [20]:
games.groupby('tournament').min('date').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------+
|          tournament|min(date)|
+--------------------+---------+
|United Arab Emira...|     1994|
|          Balkan Cup|     1929|
|           UEFA Euro|     1960|
|          CECAFA Cup|     1973|
| Nordic Championship|     1924|
|Copa Juan Pinto D...|     1963|
|      Viva World Cup|     2006|
|    West African Cup|     1982|
|         UNIFFAC Cup|     1999|
|            Gold Cup|     1991|
|British Championship|     1884|
|          King's Cup|     1968|
|Oceania Nations C...|     1994|
|Copa Carlos Dittborn|     1962|
|Copa Bernardo O'H...|     1955|
|African Cup of Na...|     1957|
|          Baltic Cup|     1928|
|         Dunhill Cup|     1997|
|          Mundialito|     1980|
|   Copa Oswaldo Cruz|     1950|
+--------------------+---------+
only showing top 20 rows

In [21]:
games.groupby('tournament').count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+
|          tournament|count|
+--------------------+-----+
|United Arab Emira...|   31|
|          Balkan Cup|   87|
|           UEFA Euro|  286|
|          CECAFA Cup|  620|
| Nordic Championship|  283|
|Copa Juan Pinto D...|   16|
|      Viva World Cup|   56|
|    West African Cup|   54|
|         UNIFFAC Cup|   15|
|            Gold Cup|  296|
|British Championship|  501|
|          King's Cup|  234|
|Oceania Nations C...|   70|
|Copa Carlos Dittborn|   16|
|Copa Bernardo O'H...|   10|
|African Cup of Na...|  638|
|          Baltic Cup|   87|
|         Dunhill Cup|   15|
|          Mundialito|    7|
|   Copa Oswaldo Cruz|   16|
+--------------------+-----+
only showing top 20 rows

Now, repartition on tournament using `repartition` and then call `cache`. Make 100 partitions

In [22]:
games = games.repartition(100,'tournament').cache()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Run the same code as before

In [23]:
games.groupby('tournament').min('date').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------+
|          tournament|min(date)|
+--------------------+---------+
|Malta Internation...|     1988|
|United Arab Emira...|     1994|
|          Balkan Cup|     1929|
|           UEFA Euro|     1960|
|   AFC Challenge Cup|     2006|
|African Cup of Na...|     1961|
| Nordic Championship|     1924|
|          CECAFA Cup|     1973|
|             OSN Cup|     2013|
|   Tournoi de France|     1988|
|Copa Juan Pinto D...|     1963|
|           UDEAC Cup|     1984|
|    West African Cup|     1982|
|      Viva World Cup|     2006|
|         UNIFFAC Cup|     1999|
|   NAFU Championship|     1947|
|CONCACAF Champion...|     1965|
|            Gold Cup|     1991|
|Indonesia Tournament|     1970|
|           Nehru Cup|     1982|
+--------------------+---------+
only showing top 20 rows

In [24]:
games.groupby('tournament').count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+
|          tournament|count|
+--------------------+-----+
|Malta Internation...|   53|
|United Arab Emira...|   31|
|          Balkan Cup|   87|
|           UEFA Euro|  286|
|   AFC Challenge Cup|  100|
|African Cup of Na...| 1558|
| Nordic Championship|  283|
|          CECAFA Cup|  620|
|             OSN Cup|    4|
|   Tournoi de France|   10|
|Copa Juan Pinto D...|   16|
|           UDEAC Cup|   65|
|    West African Cup|   54|
|      Viva World Cup|   56|
|         UNIFFAC Cup|   15|
|   NAFU Championship|    7|
|CONCACAF Champion...|   21|
|            Gold Cup|  296|
|Indonesia Tournament|   75|
|           Nehru Cup|   90|
+--------------------+-----+
only showing top 20 rows