<a href="https://colab.research.google.com/github/bonioloff/note_pyspark/blob/main/PySpark_Note.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Explanation on how Spark works:
- https://stackoverflow.com/questions/32621990/what-are-workers-executors-cores-in-spark-standalone-cluster

In [1]:
# install the pyspark package
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 36.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=8f84150baab007bfd1210ae405ccc79e10e52ff52e381376f0595fad7ad7f661
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
from pyspark import SparkConf, SparkContext

In [3]:
# Initialize spark context
conf = SparkConf().setMaster("local").setAppName("Spark App")
sc = SparkContext(conf=conf)

# Resilient Distributed Dataset (RDD) Interface

RDD can be used to store all types of data source:
- Manual entry
- textFile from local file, s3, hdfs, etc.
- Hive
- JDBC
- Cassandra
- HBase
- ElasticSearch
- JSON, etc

In [4]:
# Manual way to create RDD
rdd = sc.parallelize([1,2,3,4])

In [5]:
# Create RDD from text file
rdd = sc.textFile("sample_data/california_housing_test.csv")

In [6]:
# Show the first 5 rows
rdd.take(5)
# rdd.collect()

['"longitude","latitude","housing_median_age","total_rooms","total_bedrooms","population","households","median_income","median_house_value"',
 '-122.050000,37.370000,27.000000,3885.000000,661.000000,1537.000000,606.000000,6.608500,344700.000000',
 '-118.300000,34.260000,43.000000,1510.000000,310.000000,809.000000,277.000000,3.599000,176500.000000',
 '-117.810000,33.780000,27.000000,3589.000000,507.000000,1484.000000,495.000000,5.793400,270500.000000',
 '-118.360000,33.820000,28.000000,67.000000,15.000000,49.000000,11.000000,6.135900,330000.000000']

In [7]:
# flatMap()
rdd.flatMap(lambda x: x.split(",")).take(10)

['"longitude"',
 '"latitude"',
 '"housing_median_age"',
 '"total_rooms"',
 '"total_bedrooms"',
 '"population"',
 '"households"',
 '"median_income"',
 '"median_house_value"',
 '-122.050000']

## Basic Operations with RDD
These are the basic operations in rdd: 
- map, 
- flatmap, 
- filter, 
- distinct, 
- sample, 
- union, 
- intersection, 
- subtract, 
- cartesian

_bykey_:
- reduceByKey()
- groupByKey()
- sortByKey()
- keys()
- values()

In [8]:
# map()
income_vs_homevalue = rdd.map(lambda x: x.split(",")).map(lambda x: (x[-2], x[-1]))
income_vs_homevalue.take(5)

[('"median_income"', '"median_house_value"'),
 ('6.608500', '344700.000000'),
 ('3.599000', '176500.000000'),
 ('5.793400', '270500.000000'),
 ('6.135900', '330000.000000')]

In [9]:
# reduceByKey()
income_vs_homevalue.reduceByKey(lambda x, y: x + y).take(5)

[('"median_income"', '"median_house_value"'),
 ('6.608500', '344700.000000'),
 ('3.599000', '176500.000000'),
 ('5.793400', '270500.000000'),
 ('6.135900', '330000.000000106300.000000225000.000000')]

In [10]:
# keys()
income_vs_homevalue.keys().take(5)

['"median_income"', '6.608500', '3.599000', '5.793400', '6.135900']

In [11]:
# values()
income_vs_homevalue.values().take(5)

['"median_house_value"',
 '344700.000000',
 '176500.000000',
 '270500.000000',
 '330000.000000']

In [12]:
# sortByKey()
income_vs_homevalue.sortByKey().take(5)

[('"median_income"', '"median_house_value"'),
 ('0.499900', '500001.000000'),
 ('0.536000', '162500.000000'),
 ('0.536000', '275000.000000'),
 ('0.536000', '87500.000000')]

# SparkSQL: DataFrame (DF) / Datasets

Unlike RDD, DataFrame has some benefits:
- Contains row objects
- Can run sql
- Can have schema
- Can communicate JDBC and ODBC and Tableau

Instead using SparkContext, DF lives in SparkSession.

__Datasets__ are typed Dataframe, it's used in typed programming language like Scala and Java, but in Python that is untyped we can use DataFrame.

Typed means we have to specifically tell what is the data type of the columns.


In [13]:
from pyspark.sql import SparkSession, Row, functions as fun

In [14]:
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [15]:
inputData = spark.read.csv("sample_data/california_housing_test.csv", header=True, inferSchema=True)

In [16]:
inputData.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [17]:
inputData.select("longitude").show(5)

+---------+
|longitude|
+---------+
|  -122.05|
|   -118.3|
|  -117.81|
|  -118.36|
|  -119.67|
+---------+
only showing top 5 rows



In [18]:
inputData.filter(inputData.total_rooms > 1000).show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|
|  -119.67|   36.33|              19.0|     1241.0|         244.0|     850.0|     237.0|       2.9375|           81700.0|
|  -119.56|   36.51|              37.0|     1018.0|         213.0|     663.0|     204.0|       1.6635|           67000.0|
+---------+--------+----

In [19]:
inputData.groupBy("households").count().show(5)

+----------+-----+
|households|count|
+----------+-----+
|     305.0|    4|
|     558.0|    2|
|     496.0|    1|
|     596.0|    5|
|     299.0|    4|
+----------+-----+
only showing top 5 rows



In [20]:
inputData.select(inputData.households, inputData.median_income * 0.8).show(3)

+----------+---------------------+
|households|(median_income * 0.8)|
+----------+---------------------+
|     606.0|               5.2868|
|     277.0|   2.8792000000000004|
|     495.0|    4.634720000000001|
+----------+---------------------+
only showing top 3 rows



In [21]:
inputData.groupBy("households").agg(fun.round(fun.avg("median_income"), 2)).show(5)

+----------+----------------------------+
|households|round(avg(median_income), 2)|
+----------+----------------------------+
|     305.0|                        4.49|
|     558.0|                         3.4|
|     496.0|                        4.23|
|     596.0|                        3.08|
|     299.0|                        2.89|
+----------+----------------------------+
only showing top 5 rows



In [22]:
# Read text
textData = spark.read.text("sample_data/README.md")

In [23]:
# Split texts into words
words = textData.select(fun.explode(fun.split(textData.value, "\\W+")).alias("word"))

In [24]:
# count words
words.groupBy("word").count().sort("count", ascending=False).show()

+-------------+-----+
|         word|count|
+-------------+-----+
|             |   23|
|        https|    4|
|           is|    4|
|vega_datasets|    3|
|          the|    3|
|     Anscombe|    3|
|          com|    3|
|            a|    3|
|           en|    2|
|         copy|    2|
|           in|    2|
|         data|    2|
|          was|    2|
|      housing|    2|
|         json|    2|
|     anscombe|    2|
|          csv|    2|
|           of|    2|
|         wiki|    2|
|          org|    2|
+-------------+-----+
only showing top 20 rows



# Using dataframe to analyze movielens dataset

In [25]:
!curl -O http://files.grouplens.org/datasets/movielens/ml-1m.zip
!unzip ml-1m.zip

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 5778k  100 5778k    0     0  17.9M      0 --:--:-- --:--:-- --:--:-- 17.9M
Archive:  ml-1m.zip
   creating: ml-1m/
  inflating: ml-1m/movies.dat        
  inflating: ml-1m/ratings.dat       
  inflating: ml-1m/README            
  inflating: ml-1m/users.dat         


In [44]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, LongType

# Prepare rating dataset
schema = StructType([StructField("UserID", StringType()),
                     StructField("MovieID", StringType()),
                     StructField("Ratings", IntegerType()),
                     StructField("Timestamp", LongType())])

rating = spark.read.csv("ml-1m/ratings.dat", sep="::", schema=schema)

# Prepare movie dataset
schema = StructType([StructField("MovieID", StringType()),
                     StructField("Title", StringType()),
                     StructField("Genres", StringType())])

movie = spark.read.csv("ml-1m/movies.dat", sep="::", schema=schema)


In [45]:
rating.show(4)

+------+-------+-------+---------+
|UserID|MovieID|Ratings|Timestamp|
+------+-------+-------+---------+
|     1|   1193|      5|978300760|
|     1|    661|      3|978302109|
|     1|    914|      3|978301968|
|     1|   3408|      4|978300275|
+------+-------+-------+---------+
only showing top 4 rows



In [47]:
movie.show(5, truncate=False)

+-------+----------------------------------+----------------------------+
|MovieID|Title                             |Genres                      |
+-------+----------------------------------+----------------------------+
|1      |Toy Story (1995)                  |Animation|Children's|Comedy |
|2      |Jumanji (1995)                    |Adventure|Children's|Fantasy|
|3      |Grumpier Old Men (1995)           |Comedy|Romance              |
|4      |Waiting to Exhale (1995)          |Comedy|Drama                |
|5      |Father of the Bride Part II (1995)|Comedy                      |
+-------+----------------------------------+----------------------------+
only showing top 5 rows



In [48]:
top_rated = rating.groupBy("MovieID").count().sort("count", ascending=False)
top_rated.show(5)

+-------+-----+
|MovieID|count|
+-------+-----+
|   2858| 3428|
|    260| 2991|
|   1196| 2990|
|   1210| 2883|
|    480| 2672|
+-------+-----+
only showing top 5 rows



To sent data to all executer and be available to all executer anytime, we can use sc.broadcast()

By doing this, the executors will have exactly the same variable values.