# PySpark: Advanced Functions

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, DoubleType, DecimalType
from pyspark.sql import Window
import sys

## 1. Unions

In [0]:
df_match_1 = spark.read.\
                option('inferSchema', 'true').\
                option('header', 'true').\
                option('sep',';').\
                option("encoding", "ISO-8859-1").\
                csv('/FileStore/tables/Jugadores_01.csv')

df_match_2 = spark.read.\
                option('inferSchema', 'true').\
                option('header', 'true').\
                option('sep',';').\
                option("encoding", "ISO-8859-1").\
                csv('/FileStore/tables/Jugadores_02.csv')

In [0]:
display(df_match_1)

Equipo,_c1,PLAYER_ID,Nombre jugador,Tiempo en juego,Posesiones individuales,Remates,Remates al arco,Efectividad en remates,Asistencias de remate,Pases,% efectividad,Pases exitosos,Centros en jugada,Centros exitosos en jugada,Pelotas recuperadas,Pelotas perdidas,Fueras de juego,Faltas cometidas,Faltas sufridas,Tarjetas amarillas,Tarjetas rojas,AGE,Minutos jugados (promedio),Goles (jugadores),Corners,Bloqueos,Intercepciones,Despejes,Centros en jugada derecha,Centros en jugada exitosos derecha,Centros en jugada izquerda,Centros en jugada exitosos izquierda,Goles que abren el partido,Remates de cabeza,Remates con pierna derecha,Remates con pierna izquierda,Tiempo en juego (segundos),Pases recibidos,Asistencias,Entradas al ultimo tercio,Entradas exitosas al ultimo tercio,Entradas al area,Entradas exitosas al area,Pases hacia adelante,Pases laterales,Pases hacia atrás,Robos completados,Robos - ultimo tercio,Asistencias de remate - remates fuera del área,Asistencias de remate - remates en el tercio izquierdo del área,Asistencias de remate - remates en zona de peligro,Asistencias de remate - remates en el tercio derecho del área,Toques en el área / partido
Chicago,Titulares,374431,Leonardo B.,955,25,0,0,,0,24,63%,15,0,0,13,9,0,0,0,0,0,36,90',0,0,0,0,0,0,0,0,0,0,0,0,0,5.731,5,0,2,0,0,0,15,8,0,0,0,0,0,0,0,0
,,1157449,Alex V.,955,52,0,0,,0,40,70%,28,3,0,17,22,0,2,0,1,0,20,90',0,0,6,0,3,3,0,0,0,0,0,0,0,5.731,22,0,2,1,6,1,17,14,8,0,0,0,0,0,0,1
,,916002,Gonzalo E.,955,52,2,0,0%,0,43,74%,32,1,0,20,17,0,0,0,1,0,22,90',0,0,3,2,4,0,0,1,0,0,0,0,2,5.731,21,0,4,3,2,1,20,13,9,3,0,0,0,0,0,0
,,375567,Bruno B.,955,45,0,0,,0,36,81%,29,0,0,17,13,1,2,0,1,0,31,90',0,0,3,1,7,0,0,0,0,0,0,0,0,5.731,24,0,4,2,2,0,15,18,3,0,0,0,0,0,0,0
,,655947,Rafael M.,955,48,0,0,,0,41,83%,34,0,0,20,11,0,0,0,0,0,30,90',0,0,2,0,9,0,0,0,0,0,0,0,0,5.731,27,0,3,1,1,0,17,22,2,2,0,0,0,0,0,0
,,1104380,Tomas C.,955,45,2,1,50%,0,33,64%,21,2,0,5,19,0,2,4,1,0,21,90',0,1,1,0,0,2,0,0,0,0,0,2,0,5.731,33,0,2,1,4,1,8,16,6,0,0,0,0,0,0,1
,,727119,Fernando Z.,955,42,0,0,,0,35,69%,24,0,0,12,14,0,0,4,0,0,28,90',0,0,2,0,2,0,0,0,0,0,0,0,0,5.731,25,0,6,3,2,0,12,19,3,3,0,0,0,0,0,0
,,373695,Matias F.,955,43,0,0,,0,41,76%,31,0,0,9,10,0,0,0,0,0,33,90',0,0,3,1,2,0,0,0,0,0,0,0,0,5.731,32,0,5,2,1,0,15,21,4,1,0,0,0,0,0,1
,,538006,Marcelo E.,757,39,0,0,,0,29,66%,19,0,0,9,17,0,1,2,0,0,32,75',0,0,2,1,4,0,0,0,0,0,0,0,0,4.541,27,0,5,2,1,1,11,13,5,0,0,0,0,0,0,0
,,822970,Gabriel E.,796,31,0,0,,0,19,84%,16,3,0,5,8,0,1,4,1,0,27,79',0,0,1,1,1,0,0,3,0,0,0,0,0,4.779,24,0,2,2,3,1,5,12,2,1,0,0,0,0,0,0


In [0]:
full_df = df_match_1.union(df_match_2)
print("New shape: ({0},{1})".format(full_df.count(), len(full_df.columns))) 

## 2. Missing values

In [0]:
df_NaN_values = full_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in full_df.columns])
df_NaN_values.select([df_NaN_values.columns[i] for i in range(len(df_NaN_values.columns)) 
                      if df_NaN_values.collect()[0][i] > 0]).toPandas().T

Unnamed: 0,0
Equipo,52
_c1,48
Efectividad en remates,31


## 3. Ranking

In [0]:
from pyspark import SQLContext
from pyspark.sql.window import Window
from pyspark.sql.types import DateType
import pyspark.sql.functions as F
from pyspark.sql.functions import rank, col ,concat, lit , isnan, when, count

shopping_data = \
[('Juanjo','2018-10-10','Paint',80),('Adri','2018-04-02','Ladder',20),('Hugo','2018-06-22','Stool',20),\
('Juanjo','2018-12-09','Vacuum',40),('Adri','2018-07-12','Bucket',5),('Hugo','2018-02-18','Gloves',5),\
('Juanjo','2018-03-03','Brushes',30),('Juanjo','2018-09-26','Sandpaper',10)]

df = spark.createDataFrame(shopping_data, ['name','date','product','price'])\
                .withColumn('date',F.col('date').cast(DateType()))

w0 = Window.orderBy(df['price'].desc()).rowsBetween(-sys.maxsize, 0)

top =df.select('*', rank().over(w0).alias('rank')).filter(col('rank') <= 4)

display(top)


name,date,product,price,rank
Juanjo,2018-10-10,Paint,80,1
Juanjo,2018-12-09,Vacuum,40,2
Juanjo,2018-03-03,Brushes,30,3
Adri,2018-04-02,Ladder,20,4
Hugo,2018-06-22,Stool,20,4


## 4. Join

In [0]:
home_dir = "/FileStore/tables/"
movies = (spark.read.format("csv")
                    .options(header = True, inferSchema = True)
                    .load(home_dir + "movies.csv")
                    .cache()) # Keep the dataframe in memory for faster processing

ratings = (spark.read.format("csv")
                     .options(header = True, inferSchema = True)
                     .load(home_dir + "ratings2.csv")
                     .cache())

In [0]:
ratings.groupBy("movieId", "userId").count().filter("count != 1").show()

In [0]:
ratings_agg = (ratings.groupBy(col("movieId")).agg(count(col("movieId")).alias("count"),
                                                   F.avg(col("rating")).alias("avg_rating")))
ratings_agg.show()

In [0]:
ratings_agg.alias("t1")\
           .join(movies.alias("t2"), col("t1.movieId") == col("t2.movieId"))\
           .filter("count > 150")\
           .orderBy(F.desc("avg_rating"))\
           .select("t1.movieId", "title", "avg_rating", "count")\
           .limit(10)\
           .show()

## 5. SQL

In [0]:
sql("show tables").show()

In [0]:
movies.createOrReplaceTempView("movies")
ratings.createOrReplaceTempView("ratings")
sql("show tables").show()

In [0]:
spark.sql(""" select t1.movieId, t1.title, 
                     avg(t2.rating) avg_rating, 
                     count(1) rating_count 
              from movies t1 join ratings t2 on t1.movieId = t2.movieId 
              group by t1.movieId, t1.title
              having rating_count >= 150
              order by avg_rating desc 
              limit 10
""").show()

In [0]:
spark.catalog.dropTempView("movies")