In [1]:
# This cell is not needed when this jupyter notebook is running on a Sagemaker instance
# This is only needed when running it on local laptop
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession, HiveContext
# Set up a spark session with leveraging all available CPUs
spark = SparkSession \
        .builder \
        .master('local[*]')\
        .appName("Demo") \
        .config("hive.metastore.uris", "thrift://localhost:9083") \
        .config("spark.driver.bindAddress", "127.0.0.1") \
        .enableHiveSupport() \
        .getOrCreate()
print("Spark Version: " + spark.version)

Spark Version: 3.0.1


In [41]:
spark.sparkContext.getConf().getAll()

[('spark.app.id', 'local-1613358889945'),
 ('spark.driver.host', '92.242.140.21'),
 ('spark.sql.catalogImplementation', 'hive'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.driver.port', '60997'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'Demo'),
 ('spark.driver.bindAddress', '127.0.0.1')]

In [15]:
from pyspark.sql.functions import col, split, mean, udf
from pyspark.sql.types import *
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.sql import Row, Column
import numpy as np

# Create DataFrame via Row and Schema

In [3]:
# Define rows and schena
rows = [Row(0, "red"), Row(1, "blue")]
schema = ["Id", "Color"]
df = spark.createDataFrame(rows, schema)
df.show()

+---+-----+
| Id|Color|
+---+-----+
|  0|  red|
|  1| blue|
+---+-----+



In [4]:
# Even simpler
rows = [(0, "red"), (1, "blue")]
schema = "Id INT, Color STRING"
df = spark.createDataFrame(rows, schema)
df.show()

+---+-----+
| Id|Color|
+---+-----+
|  0|  red|
|  1| blue|
+---+-----+



In [10]:
df.write.mode("overwrite").saveAsTable('test_db')

In [7]:
# Display Spark query logical and physical plans
df.explain(True)

== Parsed Logical Plan ==
LogicalRDD [Id#13, Color#14], false

== Analyzed Logical Plan ==
Id: int, Color: string
LogicalRDD [Id#13, Color#14], false

== Optimized Logical Plan ==
LogicalRDD [Id#13, Color#14], false

== Physical Plan ==
*(1) Scan ExistingRDD[Id#13,Color#14]



# HiveContext

In [44]:
hive_Context = HiveContext(spark.sparkContext)

In [25]:
print(hive_Context.getConf("spark.sql.warehouse.dir"))

file:/Users/cheyaohu/WorkDocs/TFC/Spark/spark-warehouse/


In [24]:
hive_Context.sql("CREATE TABLE IF NOT EXISTS TestTable (key INT, value STRING)")

DataFrame[]

In [27]:
print(hive_Context.sql("SHOW Tables").show())

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|  test_db|      false|
| default|testtable|      false|
+--------+---------+-----------+

None


# View Metadata in Spark Session 

In [8]:
print(spark.sql('SHOW DATABASES').show())

+---------+
|namespace|
+---------+
|  default|
+---------+

None


In [9]:
print(spark.sql('SHOW TABLES').show())

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|  test_db|      false|
+--------+---------+-----------+

None


In [12]:
print(spark.sql("DESCRIBE test_db"))

DataFrame[col_name: string, data_type: string, comment: string]


In [28]:
spark.catalog.listDatabases()

[Database(name='default', description='Default Hive database', locationUri='file:/Users/cheyaohu/WorkDocs/TFC/Spark/spark-warehouse')]

In [29]:
spark.catalog.listTables()

[Table(name='test_db', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='testtable', database='default', description=None, tableType='MANAGED', isTemporary=False)]

# Table Join

In [23]:
# Create two dataframes
rows = [(0, "red"), (1, "blue")]
schema = "Id INT, Color STRING"
df = spark.createDataFrame(rows, schema)

rows1 = [(0, "good"), (1, "bad")]
schema1 = "Id INT, Mark STRING"
df1 = spark.createDataFrame(rows1, schema1)

tmp_df = df.join(df1, on="Id", how="inner")
tmp_df.show()

+---+-----+----+
| Id|Color|Mark|
+---+-----+----+
|  1| blue| bad|
|  0|  red|good|
+---+-----+----+



# Table Union

In [24]:
# Create two dataframes
rows = [(0, "red"), (1, "blue")]
schema = "Id INT, Color STRING"
df = spark.createDataFrame(rows, schema)

rows1 = [(3, "green"), (4, "yellow")]
schema1 = "Id INT, Color STRING"
df1 = spark.createDataFrame(rows1, schema1)

tmp_df = df.union(df1)
tmp_df.show()

+---+------+
| Id| Color|
+---+------+
|  0|   red|
|  1|  blue|
|  3| green|
|  4|yellow|
+---+------+



# Datetime Conversion

In [5]:
from pyspark.sql.functions import to_timestamp
df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
df.show()

+-------------------+
|                  t|
+-------------------+
|1997-02-28 10:30:00|
+-------------------+



In [6]:
df.printSchema()

root
 |-- t: string (nullable = true)



In [7]:
df1 = df.select(to_timestamp(df.t, 'yyyy-MM-dd HH:mm:ss').alias('dt'))
df1.printSchema()

root
 |-- dt: timestamp (nullable = true)



In [8]:
df1.show()

+-------------------+
|                 dt|
+-------------------+
|1997-02-28 10:30:00|
+-------------------+



In [9]:
from pyspark.sql.functions import year, month, dayofmonth

df1.select(
    year("dt").alias('year'), 
    month("dt").alias('month'), 
    dayofmonth("dt").alias('day')
).show()

+----+-----+---+
|year|month|day|
+----+-----+---+
|1997|    2| 28|
+----+-----+---+



# User Defined Function (UDF)

In [21]:
def date_convert(date_col, date_format='yyyy-MM-dd HH:mm:ss'):
    date_list = date_col.split("-")
    year = date_list[0]
    month = date_list[1]
    day = date_list[2].split(" ")[0]
    return f'{month}/{day}/{year}'
date_convert_udf = udf(lambda x: date_convert(x) if x is not None else None, StringType())

In [28]:
df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['date'])
df.show()

+-------------------+
|               date|
+-------------------+
|1997-02-28 10:30:00|
+-------------------+



In [29]:
df.select(date_convert_udf(col('date')).alias('date-formatted') ).show()

+--------------+
|date-formatted|
+--------------+
|    02/28/1997|
+--------------+



In [30]:
df.show()

+-------------------+
|               date|
+-------------------+
|1997-02-28 10:30:00|
+-------------------+



In [31]:
df = df.withColumn('date-formatted', date_convert_udf(col('date')))
df.show()

+-------------------+--------------+
|               date|date-formatted|
+-------------------+--------------+
|1997-02-28 10:30:00|    02/28/1997|
+-------------------+--------------+



# Rename Column

In [26]:
# Create two dataframes
rows = [(0, "red"), (1, "blue")]
schema = "Id INT, Color STRING"
df = spark.createDataFrame(rows, schema)
df = df.withColumnRenamed("Color", "Color_Name")
df.show(truncate=False)

+---+----------+
|Id |Color_Name|
+---+----------+
|0  |red       |
|1  |blue      |
+---+----------+



# Drop Missing Values

In [40]:
# Create a data frame

df = spark.createDataFrame([
    (0, "Red"),
    (1, "Blue"),
    (2, "Green"),
    (3, None)
], ["id", "Color"])
df.show()

+---+-----+
| id|Color|
+---+-----+
|  0|  Red|
|  1| Blue|
|  2|Green|
|  3| null|
+---+-----+



Drop rows with NULL values in ANY columns

In [41]:
df.na.drop("any").show(truncate=False)

+---+-----+
|id |Color|
+---+-----+
|0  |Red  |
|1  |Blue |
|2  |Green|
+---+-----+



Drop rows with NULL values in ALL columns

In [43]:
df.na.drop("all").show(truncate=False)

+---+-----+
|id |Color|
+---+-----+
|0  |Red  |
|1  |Blue |
|2  |Green|
|3  |null |
+---+-----+



Drop rows with NULL values by using ```dropna()```

In [42]:
df.dropna().show(truncate=False)

+---+-----+
|id |Color|
+---+-----+
|0  |Red  |
|1  |Blue |
|2  |Green|
+---+-----+



# Fill the NULL Values

In [44]:
# Create a data frame

df = spark.createDataFrame([
    (0, "Red"),
    (1, "Blue"),
    (2, "Green"),
    (3, None)
], ["id", "Color"])
df.show()

+---+-----+
| id|Color|
+---+-----+
|  0|  Red|
|  1| Blue|
|  2|Green|
|  3| null|
+---+-----+



Fill the null values with "NA"

In [45]:
df.na.fill('NA').show()

+---+-----+
| id|Color|
+---+-----+
|  0|  Red|
|  1| Blue|
|  2|Green|
|  3|   NA|
+---+-----+



Fill the null values with mean or average

In [46]:
df = spark.createDataFrame([
    (0, "Red"),
    (1, "Blue"),
    (2, "Green"),
    (None, "Yellow")
], ["id", "Color"])
df.show()

+----+------+
|  id| Color|
+----+------+
|   0|   Red|
|   1|  Blue|
|   2| Green|
|null|Yellow|
+----+------+



In [50]:
mean_val=df.select(mean(df.id)).collect()
df.na.fill(mean_val[0][0],subset=['id']).show()

+---+------+
| id| Color|
+---+------+
|  0|   Red|
|  1|  Blue|
|  2| Green|
|  1|Yellow|
+---+------+



# One-hot Encoding

In [16]:
# Create a data frame

df = spark.createDataFrame([
    (0, "Red"),
    (1, "Blue"),
    (2, "Green"),
    (3, "White")
], ["id", "Color"])
df.show()

+---+-----+
| id|Color|
+---+-----+
|  0|  Red|
|  1| Blue|
|  2|Green|
|  3|White|
+---+-----+



In [17]:
# One-hot encoding with Pyspark CountVectorizer

df = df.withColumn("Color_Array", split(col("Color")," "))
colorVectorizer = CountVectorizer(inputCol="Color_Array", outputCol="Color_OneHotEncoded", vocabSize=4, minDF=1.0)
colorVectorizer_model = colorVectorizer.fit(df)
df_ohe = colorVectorizer_model.transform(df)
df_ohe.show(truncate=False)

+---+-----+-----------+-------------------+
|id |Color|Color_Array|Color_OneHotEncoded|
+---+-----+-----------+-------------------+
|0  |Red  |[Red]      |(4,[3],[1.0])      |
|1  |Blue |[Blue]     |(4,[1],[1.0])      |
|2  |Green|[Green]    |(4,[0],[1.0])      |
|3  |White|[White]    |(4,[2],[1.0])      |
+---+-----+-----------+-------------------+



In [21]:
# Convert the one-hot encoded column into numpy array

x_3d = np.array(df_ohe.select('Color_OneHotEncoded').collect())
X = x_3d.reshape(4, 4)
X

array([[0., 0., 0., 1.],
       [0., 1., 0., 0.],
       [1., 0., 0., 0.],
       [0., 0., 1., 0.]])

# Label Index Encoding

In [22]:
# Create a data frame

df = spark.createDataFrame([
    (0, "Red"),
    (1, "Blue"),
    (2, "Green"),
    (3, "White")
], ["id", "Color"])
df.show()

+---+-----+
| id|Color|
+---+-----+
|  0|  Red|
|  1| Blue|
|  2|Green|
|  3|White|
+---+-----+



In [25]:
# Convert a column into numerical categories (indexing)

labelIndexer = StringIndexer(inputCol="Color", outputCol="Color_Index")
labelIndexer_model = labelIndexer.fit(df)
df_lie = labelIndexer_model.transform(df)
df_lie.show(truncate=False)

+---+-----+-----------+
|id |Color|Color_Index|
+---+-----+-----------+
|0  |Red  |2.0        |
|1  |Blue |0.0        |
|2  |Green|1.0        |
|3  |White|3.0        |
+---+-----+-----------+



# Feature Assembler

In [26]:
# Create a data frame

df = spark.createDataFrame([
    (0, "Red"),
    (1, "Blue"),
    (2, "Green"),
    (3, "White")
], ["id", "Color"])
df = df.withColumn("Color_Array", split(col("Color")," "))
df.show()

+---+-----+-----------+
| id|Color|Color_Array|
+---+-----+-----------+
|  0|  Red|      [Red]|
|  1| Blue|     [Blue]|
|  2|Green|    [Green]|
|  3|White|    [White]|
+---+-----+-----------+



In [33]:
# setup one-hot encoding

colorVectorizer = CountVectorizer(inputCol="Color_Array", outputCol="Color_OneHotEncoded", vocabSize=4, minDF=1.0)
colorVectorizer_model = colorVectorizer.fit(df)
df_ohe = colorVectorizer_model.transform(df)

labelIndexer = StringIndexer(inputCol="Color", outputCol="Color_Index")
labelIndexer_model = labelIndexer.fit(df_ohe)
df_lie = labelIndexer_model.transform(df_ohe)

In [35]:
vecAssembler = VectorAssembler(inputCols=["Color_OneHotEncoded", "Color_Index"], outputCol="features")
df_va = vecAssembler.transform(df_lie)
df_va.show(truncate=False)

+---+-----+-----------+-------------------+-----------+-------------------+
|id |Color|Color_Array|Color_OneHotEncoded|Color_Index|features           |
+---+-----+-----------+-------------------+-----------+-------------------+
|0  |Red  |[Red]      |(4,[0],[1.0])      |2.0        |(5,[0,4],[1.0,2.0])|
|1  |Blue |[Blue]     |(4,[3],[1.0])      |0.0        |(5,[3],[1.0])      |
|2  |Green|[Green]    |(4,[2],[1.0])      |1.0        |(5,[2,4],[1.0,1.0])|
|3  |White|[White]    |(4,[1],[1.0])      |3.0        |(5,[1,4],[1.0,3.0])|
+---+-----+-----------+-------------------+-----------+-------------------+



# Numerical Scaling

In [61]:
# Create a data frame

df = spark.createDataFrame([
    (0, 10.0),
    (1, 11.0),
    (2, 12.0),
    (3, 13.0)
], ["Factor", "Color"])
df.show()

+------+-----+
|Factor|Color|
+------+-----+
|     0| 10.0|
|     1| 11.0|
|     2| 12.0|
|     3| 13.0|
+------+-----+



In [66]:
va = VectorAssembler(inputCols=["Factor","Color"], outputCol="Color_VA")
df_tmp = va.transform(df)
scaler = StandardScaler(inputCol="Color_VA", outputCol="scaledColor", withStd=True, withMean=True)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(df_tmp)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(df_tmp)
scaledData.show(truncate=False)

+------+-----+----------+-----------------------------------------+
|Factor|Color|Color_VA  |scaledColor                              |
+------+-----+----------+-----------------------------------------+
|0     |10.0 |[0.0,10.0]|[-1.161895003862225,-1.161895003862225]  |
|1     |11.0 |[1.0,11.0]|[-0.3872983346207417,-0.3872983346207417]|
|2     |12.0 |[2.0,12.0]|[0.3872983346207417,0.3872983346207417]  |
|3     |13.0 |[3.0,13.0]|[1.161895003862225,1.161895003862225]    |
+------+-----+----------+-----------------------------------------+



In [1]:
!ls

DataEngineering_PySpark.html
DataEngineering_PySpark.ipynb
LearningSpark-v2.0.pdf
Spark Streaming + Kinesis Integration - Spark 3.0.1 Documentation.pdf
derby.log
[34mmetastore_db[m[m
scala - Cannot connect to Hive metastore from Spark application - Stack Overflow.pdf
[34mspark-warehouse[m[m


# Handle Json

In [143]:
df = spark.read.options(header='True', inferSchema='True', delimiter='\t').format("csv").load("example.txt").na.drop()
df.show(5, truncate=False)

+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id  |usage_data                                                                                                                                                                                                                                                                                                                                                                                                          |
+----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [142]:
from pyspark.sql.functions import udf, explode, json_tuple
def str_to_arr(my_string):
    my_list = my_string.replace('[', "").replace(']', "").split("},", -1)
    results = []
    for item in my_list:
        if item[-1] != "}":
            item = item + "}"
        if item[0:2] == '"{':
            item = item.replace('"{', '{')
        results.append(item.replace('""', '"'))
    return results
str_to_arr_udf = udf(str_to_arr, ArrayType(StringType()))
# convert string to array
df = df.withColumn('usage_data_arr',str_to_arr_udf(df["usage_data"]))
# split array to rows
df = df.select(df.id,explode(df.usage_data_arr))
# convert Json items into columns
df.select(col("id"),json_tuple(col("col"),"a","b")) \
    .toDF("id","a","b") \
    .show(5, truncate=False)

+----+------------+-------+
|id  |a           |b      |
+----+------------+-------+
|9747|WhatsApp    |1770276|
|9747|TikTok      |782435 |
|9747|Huawei Home |191391 |
|9747|Chrome      |86829  |
|9748|Clash Royale|4305710|
+----+------------+-------+
only showing top 5 rows

