In [4]:
import os

In [5]:
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.2'
spark_version = 'spark-3.1.1'
os.environ['SPARK_VERSION']=spark_version

In [6]:
# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [52.7 kB]
Get:4 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:6 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:7 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Ign:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:9 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:10 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:11 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:14 http://archive.ubuntu.com/ubuntu bionic-backp

In [7]:
# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

In [8]:
# Start a SparkSession
import findspark
findspark.init()

In [9]:
#import packages

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType,StructField,StringType, DateType,IntegerType

# we are going to use this to time our queries.
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [40]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-netflix/titles_basic.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("titles_basic.csv"), sep=",", header=True)

In [41]:

df.createOrReplaceTempView('names')

In [66]:
df2 = spark.sql("SELECT * FROM names")
clean_professions_df=df2.drop("genres","primarytitle","isadult", "startyear", "endyear", "_c0")
clean_professions_df.show()


+---------+---------+--------------------+--------------+
|   tconst|titletype|       originaltitle|runtimeminutes|
+---------+---------+--------------------+--------------+
|tt0439997|    movie|           500 Almas|         105.0|
|tt0439999|tvSpecial|           80s Mania|          50.0|
|tt0440003|    movie|         A1 tou tiao|          95.0|
|tt0440004|  tvMovie| AD/BC: A Rock Opera|          30.0|
|tt0440008|  tvMovie|Abbamania: We Say...|          50.0|
|tt0440016|    movie|       Ah ma yau nan|          93.0|
|tt0440022|  tvMovie|        Al atardecer|          66.0|
|tt0440035|  tvMovie|      L'amour en pen|          52.0|
|tt0440067|    movie|      Bau lit do see|          99.0|
|tt0440078|  tvMovie|  The Band Aid Story|          95.0|
|tt0440084|  tvMovie|A Beachcombers Ch...|         120.0|
|tt0440149|    video|Blink 182: Punk P...|          61.0|
|tt0440154|  tvMovie|Boogie special: 5...|          26.0|
|tt0440155|  tvMovie|Boogie special: M...|          28.0|
|tt0440157|tvS

In [67]:
# Read in data from S3 Bucket
from pyspark import SparkFiles
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-netflix/ratings.csv"
spark.sparkContext.addFile(url)
df2 = spark.read.csv(SparkFiles.get("ratings.csv"), sep=",", header=True)

In [68]:
df2.createOrReplaceTempView('ratings')

In [69]:
df2 = spark.sql("SELECT * FROM ratings")
clean_ratings_df=df2.drop("numvotes")
clean_ratings_df.show()

+---------+-------------+
|   tconst|averagerating|
+---------+-------------+
|tt0214461|          7.4|
|tt0214659|          6.0|
|tt0214878|          6.1|
|tt0215244|          3.0|
|tt0215402|          8.1|
|tt0215423|          8.1|
|tt0215458|          5.6|
|tt0215785|          5.3|
|tt0215972|          5.5|
|tt0216247|          8.1|
|tt0216705|          7.0|
|tt0217135|          5.9|
|tt0217143|          6.5|
|tt0217196|          7.4|
|tt0217629|          7.0|
|tt0217769|          5.8|
|tt0217824|          6.3|
|tt0217836|          2.5|
|tt0218182|          6.3|
|tt0218440|          6.0|
+---------+-------------+
only showing top 20 rows



In [70]:
complete_df = clean_professions_df.join(clean_ratings_df, on = ['tconst'])
complete_df.show()

+---------+---------+--------------------+--------------+-------------+
|   tconst|titletype|       originaltitle|runtimeminutes|averagerating|
+---------+---------+--------------------+--------------+-------------+
|tt0439997|    movie|           500 Almas|         105.0|          7.2|
|tt0440003|    movie|         A1 tou tiao|          95.0|          5.9|
|tt0440004|  tvMovie| AD/BC: A Rock Opera|          30.0|          7.4|
|tt0440016|    movie|       Ah ma yau nan|          93.0|          5.5|
|tt0440067|    movie|      Bau lit do see|          99.0|          5.4|
|tt0440078|  tvMovie|  The Band Aid Story|          95.0|          8.1|
|tt0440084|  tvMovie|A Beachcombers Ch...|         120.0|          7.0|
|tt0440149|    video|Blink 182: Punk P...|          61.0|          4.1|
|tt0440154|  tvMovie|Boogie special: 5...|          26.0|          1.0|
|tt0440155|  tvMovie|Boogie special: M...|          28.0|          7.7|
|tt0440157|tvSpecial|The British Comed...|         125.0|       

In [73]:
from datetime import timedelta
from pyspark.sql.functions import udf
def minutesToHourMinutes(minuteString = "0.0"):
  
  
  if minuteString:
    minutesAsFloat = float(minuteString)
    return str(timedelta(minutes=minutesAsFloat))[:-3]
  else:
    return "0:0"

minutesToHourMinutesUdf = udf(lambda z: minutesToHourMinutes(z), StringType())
spark.udf.register("minutesToHourMinutesUdf", minutesToHourMinutesUdf)

newTitleDf = complete_df.withColumn("formatted_hhmm", minutesToHourMinutesUdf("runtimeminutes"))
newTitleDf.show()

+---------+---------+--------------------+--------------+-------------+--------------+
|   tconst|titletype|       originaltitle|runtimeminutes|averagerating|formatted_hhmm|
+---------+---------+--------------------+--------------+-------------+--------------+
|tt0439997|    movie|           500 Almas|         105.0|          7.2|          1:45|
|tt0440003|    movie|         A1 tou tiao|          95.0|          5.9|          1:35|
|tt0440004|  tvMovie| AD/BC: A Rock Opera|          30.0|          7.4|          0:30|
|tt0440016|    movie|       Ah ma yau nan|          93.0|          5.5|          1:33|
|tt0440067|    movie|      Bau lit do see|          99.0|          5.4|          1:39|
|tt0440078|  tvMovie|  The Band Aid Story|          95.0|          8.1|          1:35|
|tt0440084|  tvMovie|A Beachcombers Ch...|         120.0|          7.0|          2:00|
|tt0440149|    video|Blink 182: Punk P...|          61.0|          4.1|          1:01|
|tt0440154|  tvMovie|Boogie special: 5...| 