In [1]:
from typing import Optional, List
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

# Initializing Spark

In [2]:
# sc & ss
AppName, Mode = "Python Spark Example", "local"
conf = SparkConf().setAppName(AppName).setMaster(Mode)
sc = SparkContext(conf=conf)
ss = SparkSession.builder.getOrCreate()
print(f"SparkContext: {sc}")
print(f"SparkSession: {ss}")

SparkContext: <SparkContext master=local appName=Python Spark Example>
SparkSession: <pyspark.sql.session.SparkSession object at 0xffff626ff970>


# RDD

## Create RDD

In [3]:
# with Parallelized Collections
data = [
    [1, 2, 3], 
    [4, 5],
    [6],
    [7, 8, 9, 10]
]
persist_data = sc.parallelize(data).persist()
print(f"Type: {type(persist_data)}, Data: {persist_data.collect()}")

Type: <class 'pyspark.rdd.RDD'>, Data: [[1, 2, 3], [4, 5], [6], [7, 8, 9, 10]]


## RDD Operations

In [4]:
# Basic Operation 
print(f" first: {persist_data.first()}")
print(f" take: {persist_data.take(2)}")
print(f" count: {persist_data.count()}")
print(f" collect: {persist_data.collect()}")
print(f" countByKey: {persist_data.countByKey()}")

 first: [1, 2, 3]
 take: [[1, 2, 3], [4, 5]]
 count: 4
 collect: [[1, 2, 3], [4, 5], [6], [7, 8, 9, 10]]
 countByKey: defaultdict(<class 'int'>, {1: 1, 4: 1, 6: 1, 7: 1})


In [5]:
# Map
result = persist_data.map(lambda x: [i+1 for i in x])
print(f" Result: {result.collect()}")

 Result: [[2, 3, 4], [5, 6], [7], [8, 9, 10, 11]]


In [6]:
# flatMap
result = persist_data.flatMap(lambda x: [i+1 for i in x])
print(f" Result: {result.collect()}")

 Result: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11]


In [7]:
# filter
result = persist_data.filter(lambda x: 7 in x)
print(f" Result: {result.collect()}")

 Result: [[7, 8, 9, 10]]


In [8]:
# foreach: Run a function func on each element of the dataset. 
# This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. 
accum = sc.accumulator(0)

class MyAccumulator:
    def AccumulateByLength(nums: List[int]) -> None:
        target = len(nums)
        accum.add(target)
    def AccumulateByFirstNum(nums: List[int]) -> None:
        target = nums[0]
        accum.add(target)

persist_data.foreach(MyAccumulator.AccumulateByLength)  # 3+2+1+4 = 10
print(f" Result_1: {accum.value}") #Accessed by driver

persist_data.foreach(MyAccumulator.AccumulateByFirstNum) # 10+ (1+4+6+7) = 28
print(f" Result_2: {accum.value}") #Accessed by driver

 Result_1: 10
 Result_2: 28


In [9]:
# map: Return a new distributed dataset formed by passing each element of the source through a function func.
# Passing Functions to Spark

class MyComputer:
    def plusone(nums: List[int]) -> list:
        return [i+1 for i in nums]
    def minusone(nums: List[int]) -> list:
        return [i-1 for i in nums]

result_1 = persist_data.map(MyComputer.plusone)
result_2 = persist_data.map(MyComputer.minusone)
print(f" Result_1: {result_1.collect()}")
print(f" Result_2: {result_2.collect()}")

 Result_1: [[2, 3, 4], [5, 6], [7], [8, 9, 10, 11]]
 Result_2: [[0, 1, 2], [3, 4], [5], [6, 7, 8, 9]]


# Spark DataFrame

## Create DataFrame

In [10]:
# prepare json data
people_json = [
    {"Name": "Jacky", "Age": 27, "Gender": "Male", "Country": "Taiwan"},
    {"Name": "John", "Age": 32, "Gender": "Male", "Country": "Taiwan"},
    {"Name": "Tom", "Age": 42, "Gender": "Male", "Country": "Japan"},
    {"Name": "Mark", "Age": 16, "Gender": "Male", "Country": "Taiwan"},
    {"Name": "Sandy", "Age": 23, "Gender": "Female", "Country": "US"}
]

In [11]:
# Use rdd.toDF() to create a spark DataFrame
people_json = sc.parallelize(people_json)
sample_df = people_json.toDF()
sample_df.printSchema()
sample_df.show()

root
 |-- Age: long (nullable = true)
 |-- Country: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Name: string (nullable = true)

+---+-------+------+-----+
|Age|Country|Gender| Name|
+---+-------+------+-----+
| 27| Taiwan|  Male|Jacky|
| 32| Taiwan|  Male| John|
| 42|  Japan|  Male|  Tom|
| 16| Taiwan|  Male| Mark|
| 23|     US|Female|Sandy|
+---+-------+------+-----+



In [12]:
# Create DataFrame with a specified schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
dataSchema = StructType([       
    StructField('Name', StringType(), False),
    StructField('Age', IntegerType(), True),
    StructField('Gender', StringType(), True),
    StructField('Country', StringType(), True)
])

df = ss.createDataFrame(people_json, schema = dataSchema)
df.printSchema()
df.show(truncate=False)

root
 |-- Name: string (nullable = false)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Country: string (nullable = true)

+-----+---+------+-------+
|Name |Age|Gender|Country|
+-----+---+------+-------+
|Jacky|27 |Male  |Taiwan |
|John |32 |Male  |Taiwan |
|Tom  |42 |Male  |Japan  |
|Mark |16 |Male  |Taiwan |
|Sandy|23 |Female|US     |
+-----+---+------+-------+



## DataFrame Operations

In [13]:
# Select people from Taiwan and older than 25
df.filter((df['Country'] == "Taiwan") & (df['Age'] > 25)) \
    .select(df['Name'], df['Age'], df['Country']) \
    .show()

+-----+---+-------+
| Name|Age|Country|
+-----+---+-------+
|Jacky| 27| Taiwan|
| John| 32| Taiwan|
+-----+---+-------+



In [14]:
# groupBy & where with functions
from pyspark.sql.functions import col, sum, avg, max, min, count
df.groupBy("Country") \
  .agg(
    count("Country").alias("Count_Country"),
    min("Age").alias("Min_Age"),
    max("Age").alias("Max_Age"),
    sum("Age").alias("Sum_Age"),
    avg("Age").alias("Average_Age")
  ) \
  .where(col("Average_Age") >= 10) \
  .show()

+-------+-------------+-------+-------+-------+-----------+
|Country|Count_Country|Min_Age|Max_Age|Sum_Age|Average_Age|
+-------+-------------+-------+-------+-------+-----------+
| Taiwan|            3|     16|     32|     75|       25.0|
|     US|            1|     23|     23|     23|       23.0|
|  Japan|            1|     42|     42|     42|       42.0|
+-------+-------------+-------+-------+-------+-----------+



In [15]:
# UDF
from pyspark.sql.functions import udf

class MyUdf:
    def newname(s: str) -> str:
        return f"NEW_{s}"
    def powerage(i: int) -> int:
        return i*i

# Convert function to udf
newname_udf = udf(lambda x: MyUdf.newname(x), StringType())
powerage_udf = udf(lambda x: MyUdf.powerage(x), IntegerType())

# Apply udf
df1 = df.withColumn("New Name", newname_udf(col("Name")))
df1 = df1.withColumn("Power Age", powerage_udf(col("Age")))
df1.show()

+-----+---+------+-------+---------+---------+
| Name|Age|Gender|Country| New Name|Power Age|
+-----+---+------+-------+---------+---------+
|Jacky| 27|  Male| Taiwan|NEW_Jacky|      729|
| John| 32|  Male| Taiwan| NEW_John|     1024|
|  Tom| 42|  Male|  Japan|  NEW_Tom|     1764|
| Mark| 16|  Male| Taiwan| NEW_Mark|      256|
|Sandy| 23|Female|     US|NEW_Sandy|      529|
+-----+---+------+-------+---------+---------+



In [16]:
# PySpark DataFrame doesn’t contain the apply() function 
# However, we can leverage Pandas DataFrame.apply() by running Pandas API over PySpark.
import pyspark.pandas as ps

psdf = ps.DataFrame(df)
print(f"Type: {type(psdf)} \n {psdf.head(1)}")

# Apply function to psdf
psdf["New Name"] = psdf["Name"].apply(MyUdf.newname)
psdf["Power Age"] = psdf["Age"].apply(MyUdf.powerage)

# show
psdf.head()



Type: <class 'pyspark.pandas.frame.DataFrame'> 
     Name  Age Gender Country
0  Jacky   27   Male  Taiwan


Unnamed: 0,Name,Age,Gender,Country,New Name,Power Age
0,Jacky,27,Male,Taiwan,NEW_Jacky,729
1,John,32,Male,Taiwan,NEW_John,1024
2,Tom,42,Male,Japan,NEW_Tom,1764
3,Mark,16,Male,Taiwan,NEW_Mark,256
4,Sandy,23,Female,US,NEW_Sandy,529


In [17]:
# Spark DataFrame to Pandas DataFrame
pandas_df = df.toPandas()
pandas_df.to_csv("./data/people.csv", index = False)

# Spark SQL

## Create TempView

In [18]:
df.createOrReplaceTempView("PEOPLE_TABLE")
df.createOrReplaceGlobalTempView("GLOBAL_PEOPLE_TABLE")

## SQL Operations

In [19]:
# select from table / global table
ss.sql("SELECT * FROM PEOPLE_TABLE").show()
ss.sql("SELECT * FROM global_temp.GLOBAL_PEOPLE_TABLE").show()

+-----+---+------+-------+
| Name|Age|Gender|Country|
+-----+---+------+-------+
|Jacky| 27|  Male| Taiwan|
| John| 32|  Male| Taiwan|
|  Tom| 42|  Male|  Japan|
| Mark| 16|  Male| Taiwan|
|Sandy| 23|Female|     US|
+-----+---+------+-------+

+-----+---+------+-------+
| Name|Age|Gender|Country|
+-----+---+------+-------+
|Jacky| 27|  Male| Taiwan|
| John| 32|  Male| Taiwan|
|  Tom| 42|  Male|  Japan|
| Mark| 16|  Male| Taiwan|
|Sandy| 23|Female|     US|
+-----+---+------+-------+



In [20]:
# sql query
ss.sql("""
        SELECT
           Name, Upper(Name) AS NEW_Name
        FROM
           PEOPLE_TABLE
       """
      ).show()

+-----+--------+
| Name|NEW_Name|
+-----+--------+
|Jacky|   JACKY|
| John|    JOHN|
|  Tom|     TOM|
| Mark|    MARK|
|Sandy|   SANDY|
+-----+--------+



In [21]:
# sql with UDF
class MySqlUdf:
    def lowercase(s: str) -> str:
        return s.lower()
    def uppercase(s: str) -> str:
        return s.upper()

# register
ss.udf.register("lowercase_udf", MySqlUdf.lowercase)
ss.udf.register("uppercase_udf", MySqlUdf.uppercase)

# query
ss.sql("""
        SELECT
           Name, 
           lowercase_udf(Name) AS LOWER_Name,
           uppercase_udf(Name) AS UPPER_Name
        FROM
           PEOPLE_TABLE
       """
      ).show()

+-----+----------+----------+
| Name|LOWER_Name|UPPER_Name|
+-----+----------+----------+
|Jacky|     jacky|     JACKY|
| John|      john|      JOHN|
|  Tom|       tom|       TOM|
| Mark|      mark|      MARK|
|Sandy|     sandy|     SANDY|
+-----+----------+----------+



# MLlib

### DataFrame-based API is primary API

In [59]:
# Correlation
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation

data = [(Vectors.sparse(4, [(0, 1.0), (3, -2.0)]),),
        (Vectors.dense([4.0, 5.0, 0.0, 3.0]),),
        (Vectors.dense([6.0, 7.0, 0.0, 8.0]),),
        (Vectors.sparse(4, [(0, 9.0), (3, 1.0)]),)]
df = ss.createDataFrame(data, ["features"])

r1 = Correlation.corr(df, "features").head()
print("Pearson correlation matrix:\n" + str(r1[0]))

r2 = Correlation.corr(df, "features", "spearman").head()
print("Spearman correlation matrix:\n" + str(r2[0]))

Pearson correlation matrix:
DenseMatrix([[1.        , 0.05564149,        nan, 0.40047142],
             [0.05564149, 1.        ,        nan, 0.91359586],
             [       nan,        nan, 1.        ,        nan],
             [0.40047142, 0.91359586,        nan, 1.        ]])
Spearman correlation matrix:
DenseMatrix([[1.        , 0.10540926,        nan, 0.4       ],
             [0.10540926, 1.        ,        nan, 0.9486833 ],
             [       nan,        nan, 1.        ,        nan],
             [0.4       , 0.9486833 ,        nan, 1.        ]])


In [60]:
# K-Means
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Loads data.
# Need to pass absolute path, otherwise it willread from file://home/jovyan/...
dataset = ss.read.format("libsvm").load("file:///usr/local/spark/data/mllib/sample_kmeans_data.txt")

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Silhouette with squared euclidean distance = 0.9997530305375207
Cluster Centers: 
[9.1 9.1 9.1]
[0.1 0.1 0.1]


In [68]:
# CF (Collaborative Filtering)
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

# Loads data.
# Need to pass absolute path, otherwise it willread from file://home/jovyan/...
lines = ss.read.text("file:///usr/local/spark/data/mllib/als/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=int(p[3])))
ratings = ss.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10).show(1)

# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10).show(1)

# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10).show(1)

# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10).show(1)

Root-mean-square error = 1.776502226798719
+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    20|[{22, 4.568694}, ...|
+------+--------------------+
only showing top 1 row

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     20|[{17, 4.632046}, ...|
+-------+--------------------+
only showing top 1 row

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    26|[{22, 5.149181}, ...|
+------+--------------------+
only showing top 1 row

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     65|[{23, 4.6378446},...|
+-------+--------------------+
only showing top 1 row



## ML Pipeline

In [70]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples.
training = ss.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled (id, text) tuples.
test = ss.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print(
        "(%d, %s) --> prob=%s, prediction=%f" % (
            rid, text, str(prob), prediction   # type: ignore
        )
    )

(4, spark i j k) --> prob=[0.6292098489668486,0.3707901510331514], prediction=0.000000
(5, l m n) --> prob=[0.984770006762304,0.015229993237696027], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.13412348342566097,0.8658765165743391], prediction=1.000000
(7, apache hadoop) --> prob=[0.9955732114398529,0.00442678856014711], prediction=0.000000


# Spark Streaming

### Output Modes
- Append mode (default): This is the default mode, where only the new rows added to the Result Table since the last trigger will be outputted to the sink. This is supported for only those queries where rows added to the Result Table is never going to change. Hence, this mode guarantees that each row will be output only once (assuming fault-tolerant sink). For example, queries with only select, where, map, flatMap, filter, join, etc. will support Append mode.

- Complete mode: The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries.

- Update mode: (Available since Spark 2.1.1) Only the rows in the Result Table that were updated since the last trigger will be outputted to the sink. More information to be added in future releases.

In [42]:
# Schema
dataSchema = StructType([       
    StructField('Name', StringType(), False),
    StructField('Age', IntegerType(), True),
    StructField('Gender', StringType(), True),
    StructField('Country', StringType(), True)
])

# Read data stream from data folder
# Trageting csv file and read to become a stream_df (Unbonded Table)
stream_df = ss \
    .readStream \
    .format("csv") \
    .schema(dataSchema) \
    .option("header", True) \
    .option("maxFilesPerTrigger", 1) \
    .load("./data")

# Check Streaming
stream_df.printSchema()
print(f"isStreaming: {stream_df.isStreaming}")

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Country: string (nullable = true)

isStreaming: True


In [43]:
# Transform: Add column by using UDF
stream_df = stream_df.withColumn("Power_Age", powerage_udf(col("Age")))

# Transform: Aggregation operation
stream_df = stream_df.groupBy("Country") \
  .agg(
    count("Country").alias("Count_Country"),
    min("Age").alias("Min_Age"),
    max("Age").alias("Max_Age"),
    sum("Age").alias("Sum_Age"),
    avg("Age").alias("Average_Age"),
    avg("Power_Age").alias("Average_Power_Age")
  ) \
  .where(col("Average_Age") > 24)

In [44]:
# Memory Sink
stream_df.writeStream \
    .format("memory") \
    .outputMode("complete") \
    .queryName("MY_TABLE") \
    .start()

<pyspark.sql.streaming.StreamingQuery at 0xffff4a023250>

In [48]:
# show output (Might take some time to compute and display real output)
ss.sql("select * from MY_TABLE").show()

+-------+-------------+-------+-------+-------+-----------+-----------------+
|Country|Count_Country|Min_Age|Max_Age|Sum_Age|Average_Age|Average_Power_Age|
+-------+-------------+-------+-------+-------+-----------+-----------------+
| Taiwan|            3|     16|     32|     75|       25.0|669.6666666666666|
|  Japan|            1|     42|     42|     42|       42.0|           1764.0|
+-------+-------------+-------+-------+-------+-----------+-----------------+



# Submit a spark job

- cd /usr/local/spark/bin/
- spark-submit {MY-SPARK-CODE}.py