In [0]:
from pyspark.sql import SparkSession


In [0]:
spark = SparkSession.builder.appName("Operations").getOrCreate()

In [0]:
df = spark.read.csv('/FileStore/tables/wc2018players.csv',inferSchema=True,header=True)
df.show()

+---------+---+----+------------------+----------+----------+--------------------+------+------+
|     Team|  #|Pos.| FIFA Popular Name|Birth Date|Shirt Name|                Club|Height|Weight|
+---------+---+----+------------------+----------+----------+--------------------+------+------+
|Argentina|  3|  DF|TAGLIAFICO Nicolas|31.08.1992|TAGLIAFICO|      AFC Ajax (NED)|   169|    65|
|Argentina| 22|  MF|    PAVON Cristian|21.01.1996|     PAVÓN|CA Boca Juniors (...|   169|    65|
|Argentina| 15|  MF|    LANZINI Manuel|15.02.1993|   LANZINI|West Ham United F...|   167|    66|
|Argentina| 18|  DF|    SALVIO Eduardo|13.07.1990|    SALVIO|    SL Benfica (POR)|   167|    69|
|Argentina| 10|  FW|      MESSI Lionel|24.06.1987|     MESSI|  FC Barcelona (ESP)|   170|    72|
|Argentina|  4|  DF|  ANSALDI Cristian|20.09.1986|   ANSALDI|     Torino FC (ITA)|   181|    73|
|Argentina|  5|  MF|      BIGLIA Lucas|30.01.1986|    BIGLIA|      AC Milan (ITA)|   175|    73|
|Argentina|  7|  MF|       BAN

In [0]:
# 1.    Shows the names and height by adding 1 to the height column.

from pyspark.sql.functions import col
df.select(col('FIFA Popular Name'), col('Height')+1).show()

+------------------+------------+
| FIFA Popular Name|(Height + 1)|
+------------------+------------+
|TAGLIAFICO Nicolas|         170|
|    PAVON Cristian|         170|
|    LANZINI Manuel|         168|
|    SALVIO Eduardo|         168|
|      MESSI Lionel|         171|
|  ANSALDI Cristian|         182|
|      BIGLIA Lucas|         176|
|       BANEGA Ever|         176|
| MASCHERANO Javier|         175|
|      DYBALA Paulo|         178|
|     AGUERO Sergio|         173|
|   HIGUAIN Gonzalo|         185|
|    DI MARIA Angel|         179|
|  LO CELSO Giovani|         178|
|  MEZA Maximiliano|         181|
|      ACUNA Marcos|         173|
|CABALLERO Wilfredo|         187|
|   MERCADO Gabriel|         182|
|  OTAMENDI Nicolas|         182|
|       ROJO Marcos|         190|
+------------------+------------+
only showing top 20 rows



In [0]:
# 2.    shows the player name and simultaneously checks whether or not they have height >170
heightFilter = col('Height') > 170
df.withColumn("isTall", heightFilter)\
.select("FIFA Popular Name", "isTall").show()

+------------------+------+
| FIFA Popular Name|isTall|
+------------------+------+
|TAGLIAFICO Nicolas| false|
|    PAVON Cristian| false|
|    LANZINI Manuel| false|
|    SALVIO Eduardo| false|
|      MESSI Lionel| false|
|  ANSALDI Cristian|  true|
|      BIGLIA Lucas|  true|
|       BANEGA Ever|  true|
| MASCHERANO Javier|  true|
|      DYBALA Paulo|  true|
|     AGUERO Sergio|  true|
|   HIGUAIN Gonzalo|  true|
|    DI MARIA Angel|  true|
|  LO CELSO Giovani|  true|
|  MEZA Maximiliano|  true|
|      ACUNA Marcos|  true|
|CABALLERO Wilfredo|  true|
|   MERCADO Gabriel|  true|
|  OTAMENDI Nicolas|  true|
|       ROJO Marcos|  true|
+------------------+------+
only showing top 20 rows



In [0]:
# 3.    Show FIFA Popular Name and 0 or 1 depending on Height>170
heightFilter = col('Height') > 170
df.withColumn("isTall", heightFilter)\
.select("FIFA Popular Name", col("isTall").cast('integer')).show()

+------------------+------+
| FIFA Popular Name|isTall|
+------------------+------+
|TAGLIAFICO Nicolas|     0|
|    PAVON Cristian|     0|
|    LANZINI Manuel|     0|
|    SALVIO Eduardo|     0|
|      MESSI Lionel|     0|
|  ANSALDI Cristian|     1|
|      BIGLIA Lucas|     1|
|       BANEGA Ever|     1|
| MASCHERANO Javier|     1|
|      DYBALA Paulo|     1|
|     AGUERO Sergio|     1|
|   HIGUAIN Gonzalo|     1|
|    DI MARIA Angel|     1|
|  LO CELSO Giovani|     1|
|  MEZA Maximiliano|     1|
|      ACUNA Marcos|     1|
|CABALLERO Wilfredo|     1|
|   MERCADO Gabriel|     1|
|  OTAMENDI Nicolas|     1|
|       ROJO Marcos|     1|
+------------------+------+
only showing top 20 rows



In [0]:
# 4.    name of  shortest player
from pyspark.sql.functions import min
df.select(min('Height')).show()
# min height is 165
df.select('FIFA Popular Name').where(df['Height'] == 165).show()

+-----------+
|min(Height)|
+-----------+
|        165|
+-----------+

+-----------------+
|FIFA Popular Name|
+-----------------+
| QUINTERO Alberto|
|   YAHIA ALSHEHRI|
|  SHAQIRI Xherdan|
+-----------------+



In [0]:
# 5.    who is tallest of all. First we find the value of maximum height and then get the details of that player
from pyspark.sql.functions import max
df.select(max('Height')).show()
df.where(col('Height') == 201).show()

+-----------+
|max(Height)|
+-----------+
|        201|
+-----------+

+-------+---+----+-----------------+----------+----------+--------------+------+------+
|   Team|  #|Pos.|FIFA Popular Name|Birth Date|Shirt Name|          Club|Height|Weight|
+-------+---+----+-----------------+----------+----------+--------------+------+------+
|Croatia| 12|  GK|    KALINIC Lovre|03.04.1990|L. KALINIĆ|KAA Gent (BEL)|   201|    96|
+-------+---+----+-----------------+----------+----------+--------------+------+------+



In [0]:
# 6.    average height of the players in Argentina team.
from pyspark.sql.functions import avg
df.select(avg('Height')).show()
df.filter(df['Team'] == 'Argentina').select(avg('Height').alias('Argentina Avg Height')).show()

+-----------------+
|      avg(Height)|
+-----------------+
|182.4076086956522|
+-----------------+

+--------------------+
|Argentina Avg Height|
+--------------------+
|  178.43478260869566|
+--------------------+



### Spark Project 2: SQL

In [0]:

df1 = spark.read.csv('/FileStore/tables/movies.csv',inferSchema=True,header=True)
df1.show(3)
df2 = spark.read.csv('/FileStore/tables/ratings.csv',inferSchema=True,header=True)
df2.show(3)
df1.createOrReplaceTempView('movies')
df2.createOrReplaceTempView('ratings')

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
+-------+--------------------+--------------------+
only showing top 3 rows

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
+------+-------+------+----------+
only showing top 3 rows



In [0]:
# 1. Find the list of oldest released movies
from pyspark.sql.functions import format_number,dayofmonth,hour,dayofyear,month,year,weekofyear,date_format
spark.sql("select substring(title,-5,4) as year, title from movies").show()

+----+--------------------+
|year|               title|
+----+--------------------+
|1995|    Toy Story (1995)|
|1995|      Jumanji (1995)|
|1995|Grumpier Old Men ...|
|1995|Waiting to Exhale...|
|1995|Father of the Bri...|
|1995|         Heat (1995)|
|1995|      Sabrina (1995)|
|1995| Tom and Huck (1995)|
|1995| Sudden Death (1995)|
|1995|    GoldenEye (1995)|
|1995|American Presiden...|
|1995|Dracula: Dead and...|
|1995|        Balto (1995)|
|1995|        Nixon (1995)|
|1995|Cutthroat Island ...|
|1995|       Casino (1995)|
|1995|Sense and Sensibi...|
|1995|   Four Rooms (1995)|
|1995|Ace Ventura: When...|
|1995|  Money Train (1995)|
+----+--------------------+
only showing top 20 rows



In [0]:
# 2. How many movies are released each year?
spark.sql("select substring(title,-5,4) as year, count(title) from movies group by year").show()
#spark.sql('select year, count(year) as movies_released from tbl group by year order by year').show()


+----+------------+
|year|count(title)|
+----+------------+
|1953|          36|
|1957|          41|
|1987|         152|
|1956|          32|
|1936|          18|
|2016|          65|
|2012|         222|
|1958|          31|
|1943|          21|
|1915|           1|
|1972|          48|
|1931|          14|
|1988|         164|
|1938|          14|
|1926|           6|
|1918|           1|
|1932|          14|
|011)|           3|
|1977|          61|
|1971|          55|
+----+------------+
only showing top 20 rows



In [0]:
# 3. How many number of movies are there for each rating?
spark.sql("select rating, count(rating) as num_movies from ratings group by rating order by rating").show()

+------+----------+
|rating|num_movies|
+------+----------+
|   0.5|      1101|
|   1.0|      3326|
|   1.5|      1687|
|   2.0|      7271|
|   2.5|      4449|
|   3.0|     20064|
|   3.5|     10538|
|   4.0|     28750|
|   4.5|      7723|
|   5.0|     15095|
+------+----------+



In [0]:
# 4. How many users have rated each movie?
spark.sql('select movieId, count(userId) as Num_Users from ratings group by movieId order by movieId').show()

+-------+---------+
|movieId|Num_Users|
+-------+---------+
|      1|      247|
|      2|      107|
|      3|       59|
|      4|       13|
|      5|       56|
|      6|      104|
|      7|       53|
|      8|        5|
|      9|       20|
|     10|      122|
|     11|       82|
|     12|       18|
|     13|        8|
|     14|       31|
|     15|       11|
|     16|       88|
|     17|       86|
|     18|       26|
|     19|       92|
|     20|       13|
+-------+---------+
only showing top 20 rows



In [0]:
# 5. What is the total rating for each movie?
spark.sql('select movieId, sum(rating) as total_rating from ratings group by movieId order by movieId').show()

+-------+------------+
|movieId|total_rating|
+-------+------------+
|      1|       956.5|
|      2|       364.0|
|      3|       186.5|
|      4|        31.0|
|      5|       183.0|
|      6|       404.0|
|      7|       174.0|
|      8|        19.0|
|      9|        63.0|
|     10|       421.0|
|     11|       302.5|
|     12|        51.5|
|     13|        31.5|
|     14|       107.0|
|     15|        25.5|
|     16|       347.5|
|     17|       337.5|
|     18|        85.5|
|     19|       239.0|
|     20|        33.0|
+-------+------------+
only showing top 20 rows



In [0]:
# 6. what is the average rating for each movie
spark.sql('select movieId, round(avg(rating),2) as avg_rating from ratings group by movieId order by movieId').show()

+-------+----------+
|movieId|avg_rating|
+-------+----------+
|      1|      3.87|
|      2|       3.4|
|      3|      3.16|
|      4|      2.38|
|      5|      3.27|
|      6|      3.88|
|      7|      3.28|
|      8|       3.8|
|      9|      3.15|
|     10|      3.45|
|     11|      3.69|
|     12|      2.86|
|     13|      3.94|
|     14|      3.45|
|     15|      2.32|
|     16|      3.95|
|     17|      3.92|
|     18|      3.29|
|     19|       2.6|
|     20|      2.54|
+-------+----------+
only showing top 20 rows

