## Using Spark Cluster with shared data in Docker

### Handle Spark DataFrames

In [7]:
!pip install pyspark==3.5.3 pandas setuptools packaging



#### Download csv file to local directory (shared with spark)

In [None]:
import urllib.request
import zipfile
from os import remove

url = 'https://www.kaggle.com/api/v1/datasets/download/chaitanyahivlekar/large-movie-dataset'
urllib.request.urlretrieve(url,'movies.zip')

with zipfile.ZipFile('movies.zip', 'r') as zip_ref:
    zip_ref.extractall('./')

remove('movies.zip')

#### Connect to Spark Cluster and create Spark Session

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MyAppSDF") \
    .master("spark://spark-master:7077") \
    .getOrCreate()  



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/09 15:13:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Read data
##### Create a spark data frame from csv file

To create a spark data frame from a csv file, we can use the `read.csv` function. Using `inferSchema=True` allow spark infer correct type for each field. However, it can be slow.

In [4]:
sdf = spark.read.csv("movies_dataset.csv", header=True, inferSchema=True)

                                                                                

To see the schema, use `.printSchema()` method.

In [None]:
sdf.printSchema()

Let's show first rows of spark data frame with method `.show()`

In [None]:
sdf.show()

Count total of rows.

In [None]:
sdf.count()

#### Filter data


In [None]:
from pyspark.sql.functions import col

sdf.filter(col("Rating") >= 5 ).show()

In [None]:


sdf.filter( (col("Rating") >= 5) & col('Genre').contains("Comedy") ).show()


In [None]:
sdf.filter( (col('Genre').contains("Action")) | col('Genre').contains("Comedy") ).show()

In [None]:
sdf_top = sdf.filter( (col("Rating") >= 5) & col('Genre').contains("Comedy") )
sdf_top.show()


#### Transform data

In [5]:
from pyspark.sql import functions as sf

sdf_top.groupBy("Movie_name").agg(
    sf.count("*").alias("total_ratings"),
).show()

NameError: name 'sdf_top' is not defined

In [None]:
sdf.filter( sf.col('Genre').contains("Comedy") ). \
    groupBy("Movie_name").agg(
        sf.count("*").alias("total_ratings"),
        sf.avg("Rating").alias("avg_rating")
    ).show()

In [None]:
sdf.filter( sf.col('Genre').contains("Comedy") ). \
    groupBy("Movie_name").agg(
        sf.count("*").alias("total_ratings"),
        sf.avg("Rating").alias("avg_rating"),
        sf.sum("Rating").alias("sum_rating")
    ). \
   orderBy("total_ratings", ascending=False).show()

In [6]:
from pyspark.sql import functions as sf

df = sdf.filter( sf.col('Genre').contains("Comedy") ). \
    groupBy("Movie_name").agg(
        sf.count("*").alias("total_ratings"),
        sf.avg("Rating").alias("avg_rating"),
        sf.sum("Rating").alias("sum_rating")
    ). \
    filter( sf.col("avg_rating") > 4). \
    orderBy("total_ratings", ascending=False). \
    toPandas()

df

                                                                                

Unnamed: 0,Movie_name,total_ratings,avg_rating,sum_rating
0,Forrest Gump (1994),81491,4.048011,329876.5
1,Pulp Fiction (1994),79672,4.188912,333739.0
2,Fargo (1996),47823,4.111421,196620.5
3,"Princess Bride, The (1987)",37863,4.129097,156340.0
4,Monty Python and the Holy Grail (1975),37723,4.147655,156462.0
...,...,...,...,...
585,Living on Love (1937),1,5.000000,5.0
586,Born in Absurdistan (1999),1,4.500000,4.5
587,Road to Kabul (2012),1,4.500000,4.5
588,Junga (2018),1,5.000000,5.0
