## Using Spark Cluster with shared data in Docker

### Handle RDDs

This is a simple example of using Spark in a Docker container. Make sure you execute this notebook in code-server and not in a local notebook.

In [None]:
!pip install pyspark==3.5.3 pandas

#### Download csv file to local directory (shared with spark)

In [None]:
import urllib.request
import zipfile
from os import remove

url = 'https://www.kaggle.com/api/v1/datasets/download/chaitanyahivlekar/large-movie-dataset'
urllib.request.urlretrieve(url,'movies.zip')

with zipfile.ZipFile('movies.zip', 'r') as zip_ref:
    zip_ref.extractall('./')

remove('movies.zip')

#### Connect to Spark Cluster and create Spark Session

In [None]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("MyApp").setMaster("spark://spark-master:7077")
sc = SparkContext(conf=conf)

#### Create RDD

To create a RDD from text file as csv, we can use `.textFile()` method from spark context.

In [None]:
rdd = sc.textFile("movies_dataset.csv")


#### Send dataset to Hadoop Cluster

In [None]:

rdd.saveAsTextFile('hdfs://hadoop-namenode:8020/user/cdn/movies_data')
del rdd

##### Reading data from hadoop cluster


In [None]:
rdd = sc.textFile('hdfs://hadoop-namenode:8020/user/cdn/movies_data')

print("First few lines of the RDD:")
rdd.take(10)

In the next step, we will count the number of objects in the dataset using the `count()` method.

In [None]:
print("Count total items")
rdd.count()

##### Filter data

Here, we filter the data to first item (`.first()` method), which represent the first row of the CSV file.

In [None]:
# header
header = rdd.first()
print(header)

Using header variable and `.filter()` method, we can filter data for all rows different than the header.

In [None]:
data_rdd = rdd.filter(lambda row: row != header)
data_rdd.take(5)

Filtering all items with contains "Comedy"

In [None]:
data_rdd = data_rdd.filter(lambda row: row.find("Comedy") != -1)
data_rdd.take(10)

##### Transform data

Let's transform the data using `.map()` method together with `lambda` function. We gonna split each string item by comma.

In [None]:
data_split_rdd = data_rdd.map(lambda row: row.split(','))
data_split_rdd.take(5)

Then, we can filter the two first items in each list.

In [None]:
data = data_split_rdd.map(lambda list: list[1:3])
data.take(5)

Finally, let's collect the first 100 entries in transformed RDD and parse to a pandas data frame.

In [None]:
import pandas as pd

df = pd.DataFrame(data.take(100), columns=['User_Id', 'Movie_Name'])
df

##### Stop spark context connection

In [None]:
sc.close()