In [1]:
# practice in 
# https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/

# spark version 2.4.0
# spark mode : standalone cluster
# pyspark version : anaconda python 3.7 jupyter

from pyspark.sql import Row


l = [("Vincent",22),("Bob",26),("Jack",26)]




In [2]:
rdd = sc.parallelize(l)


In [3]:
# from rdd to spark.sql.dataframe
people = rdd.map(lambda x : Row(name = x[0],age = x[1]))
schemaPeople = sqlContext.createDataFrame(people)

In [4]:
schemaPeople.show()

+---+-------+
|age|   name|
+---+-------+
| 22|Vincent|
| 26|    Bob|
| 26|   Jack|
+---+-------+



In [5]:
type(schemaPeople)

pyspark.sql.dataframe.DataFrame

In [6]:
# load csv in HDFS spark2.0 up
train = spark.read.csv("hdfs://master/tmp/data_hsu/practice/train.csv", header=True)
test = spark.read.csv("hdfs://master/tmp/data_hsu/practice/test.csv", header=True)
train.show()

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F| 0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F| 0-17|        10|            A|                         2|             0|                12|              null|              null|    1422

In [7]:
# show Schema
train.printSchema()

root
 |-- User_ID: string (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: string (nullable = true)
 |-- Product_Category_1: string (nullable = true)
 |-- Product_Category_2: string (nullable = true)
 |-- Product_Category_3: string (nullable = true)
 |-- Purchase: string (nullable = true)



In [8]:
# show head
train.head(5)


[Row(User_ID='1000001', Product_ID='P00069042', Gender='F', Age='0-17', Occupation='10', City_Category='A', Stay_In_Current_City_Years='2', Marital_Status='0', Product_Category_1='3', Product_Category_2=None, Product_Category_3=None, Purchase='8370'),
 Row(User_ID='1000001', Product_ID='P00248942', Gender='F', Age='0-17', Occupation='10', City_Category='A', Stay_In_Current_City_Years='2', Marital_Status='0', Product_Category_1='1', Product_Category_2='6', Product_Category_3='14', Purchase='15200'),
 Row(User_ID='1000001', Product_ID='P00087842', Gender='F', Age='0-17', Occupation='10', City_Category='A', Stay_In_Current_City_Years='2', Marital_Status='0', Product_Category_1='12', Product_Category_2=None, Product_Category_3=None, Purchase='1422'),
 Row(User_ID='1000001', Product_ID='P00085442', Gender='F', Age='0-17', Occupation='10', City_Category='A', Stay_In_Current_City_Years='2', Marital_Status='0', Product_Category_1='12', Product_Category_2='14', Product_Category_3=None, Purchase

In [9]:
# count row 
train.count() , test.count()


(550068, 233599)

In [10]:
# show colume
# sql.dataframe.colume a list object
len(train.columns), train.columns


(12,
 ['User_ID',
  'Product_ID',
  'Gender',
  'Age',
  'Occupation',
  'City_Category',
  'Stay_In_Current_City_Years',
  'Marital_Status',
  'Product_Category_1',
  'Product_Category_2',
  'Product_Category_3',
  'Purchase'])

In [11]:
len(test.columns), test.columns

(11,
 ['User_ID',
  'Product_ID',
  'Gender',
  'Age',
  'Occupation',
  'City_Category',
  'Stay_In_Current_City_Years',
  'Marital_Status',
  'Product_Category_1',
  'Product_Category_2',
  'Product_Category_3'])

In [12]:
# show statistics info
train.describe().show()


+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|summary|           User_ID|Product_ID|Gender|   Age|       Occupation|City_Category|Stay_In_Current_City_Years|     Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|         Purchase|
+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|  count|            550068|    550068|550068|550068|           550068|       550068|                    550068|             550068|            550068|            376430|            166821|           550068|
|   mean|1003028.8424013031|      null|  null|  null|8.076706879876669|         null|         1.468494139793958|0.40965298835780306| 5.404270017525106| 9.84232925112238

In [13]:
# show one column statistics
train.describe("Product_ID").show()

# min and max are ACSII value

+-------+----------+
|summary|Product_ID|
+-------+----------+
|  count|    550068|
|   mean|      null|
| stddev|      null|
|    min| P00000142|
|    max|  P0099942|
+-------+----------+



In [14]:
# select one and more colume 
train.select("User_ID", "Age").show(10)

+-------+-----+
|User_ID|  Age|
+-------+-----+
|1000001| 0-17|
|1000001| 0-17|
|1000001| 0-17|
|1000001| 0-17|
|1000002|  55+|
|1000003|26-35|
|1000004|46-50|
|1000004|46-50|
|1000004|46-50|
|1000005|26-35|
+-------+-----+
only showing top 10 rows



In [15]:
# use distinct in spark dataframe
train.select("Product_ID").distinct().count(), test.select("Product_ID").distinct().count()


(3631, 3491)

In [16]:
# try to find product not in train but do in the test file
# use subtract
diff_cat = test.select("Product_ID").subtract(train.select("Product_ID"))
diff_cat.distinct().count()


# 46 different categories in test but not in train

46

In [17]:
# use cross table
train.crosstab("Age", "Gender").show()


+----------+-----+------+
|Age_Gender|    F|     M|
+----------+-----+------+
|      0-17| 5083| 10019|
|     46-50|13199| 32502|
|     18-25|24628| 75032|
|     36-45|27170| 82843|
|       55+| 5083| 16421|
|     51-55| 9894| 28607|
|     26-35|50752|168835|
+----------+-----+------+



In [18]:
# drop Duplicates
train.select("Age", "Gender").dropDuplicates().show()


+-----+------+
|  Age|Gender|
+-----+------+
|51-55|     F|
|18-25|     M|
| 0-17|     F|
|46-50|     M|
|18-25|     F|
|  55+|     M|
|  55+|     F|
|36-45|     M|
|26-35|     F|
| 0-17|     M|
|36-45|     F|
|51-55|     M|
|26-35|     M|
|46-50|     F|
+-----+------+



In [19]:
# process with NA
# 1. drop null
# param (how) three process 'any', 'all'
# param (thresh) use a threshold
# param (subset) list of column names to consider

# dropna return new dataframe 
# 不影響原有的資料

train.dropna(how='all').count()
# output 550068 -> not row have all na value

train.dropna(how='any').count()
# output 166821


166821

In [20]:
# process with NA
# 2. try to fill it
# fillna two param (value,subset)
# value take a dict param 
# 指定哪些欄位填入哪些值

# only fill Product_Category_2 with -1
train.fillna("-1").show(3)

# in spark 2.3 up 
# 填入 na 會自動指定 資料格式
# 填入 int float 就只會 fill 欄位為 int float 的 NA




+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|                -1|                -1|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|                12|                -1|                -1|    1422|
+---

In [40]:
# filter rows
# like SQL where

train.filter(train.Purchase > 15000).count()


110523

In [22]:
# SQL group by
# use groupby

# agg -> average
train.groupBy("Age").agg({'Purchase': 'mean'}).show()

+-----+-----------------+
|  Age|    avg(Purchase)|
+-----+-----------------+
|18-25|9169.663606261289|
|26-35|9252.690632869888|
| 0-17|8933.464640444974|
|46-50|9208.625697468327|
|51-55|9534.808030960236|
|36-45|9331.350694917874|
|  55+|9336.280459449405|
+-----+-----------------+



In [23]:
# groupby and sum

train.groupBy("Age").count().show()

+-----+------+
|  Age| count|
+-----+------+
|18-25| 99660|
|26-35|219587|
| 0-17| 15102|
|46-50| 45701|
|51-55| 38501|
|36-45|110013|
|  55+| 21504|
+-----+------+



In [24]:
# sample our data
# 3 param
# withReplacement     取後放回或不放回
# fraction     指定要 sample 多少樣本
# seed     just....seed 


# Seed 相同 取出的 樣本 相同
t1 = train.sample(True, 1.2, 42) # 超過 100% 的 sample 必指定 replacement = T
t2 = train.sample(False, 0.3, 42)
t1.count(), t2.count()

(659494, 164751)

In [25]:
# use apply to one column 
# like pandas apply
# result are RDD in spark

# same in spark 
# map do not action

# in spark 2.0 up must call .rdd first to use mapReduce
train.select("User_ID").rdd.map(lambda x : (x,1)).take(5)


[(Row(User_ID='1000001'), 1),
 (Row(User_ID='1000001'), 1),
 (Row(User_ID='1000001'), 1),
 (Row(User_ID='1000001'), 1),
 (Row(User_ID='1000002'), 1)]

In [26]:
# orderBy to sort dataframe
# two param 
# list of columns
# ascending = T or F

train.orderBy("Purchase", ascending = False).show(10)

# also can use like

train.orderBy(train.Purchase.desc()).show(5)

train.orderBy(["Purchase", "User_ID"], ascending=True).show(3)

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1002750| P00111842|     F|26-35|        15|            B|                         1|             0|                 8|              null|              null|    9999|
|1005341| P00277642|     F|26-35|         3|            C|                         1|             1|                 2|                 3|                10|    9999|
|1000238| P00312342|     F|51-55|         7|            B|                         0|             0|                 8|              null|              null|    9999

In [27]:
# add new columns 
train.withColumn('Purchase_new', train.Purchase / 2.0).select("Purchase", "Purchase_new").show(4)



+--------+------------+
|Purchase|Purchase_new|
+--------+------------+
|    8370|      4185.0|
|   15200|      7600.0|
|    1422|       711.0|
|    1057|       528.5|
+--------+------------+
only showing top 4 rows



In [28]:
# drop a columns


test.drop('City_Category').columns


['User_ID',
 'Product_ID',
 'Gender',
 'Age',
 'Occupation',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3']

In [29]:
# use UDF
# User define function
diff = test.select("Product_ID").subtract(train.select("Product_ID"))

diff.distinct().count()


46

In [30]:
# remove diff cate 
# return rdd of diff product id

not_found_cat = diff.distinct().rdd.map(lambda x : x[0]).collect()
len(not_found_cat)

46

In [31]:
# register UDF
# two param
# function
# return type 
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

udf_lookup = udf(lambda x: '-1' if x in not_found_cat else x, returnType=StringType())

# test.Product_ID same with test["Product_ID"]

k = test.withColumn("NEW_Product_ID", udf_lookup(test["Product_ID"])).select("NEW_Product_ID")



In [32]:
# lookup again in test and train if there is different cate
diff = k.select("NEW_Product_ID").subtract(train.select("Product_ID"))
diff.distinct().count()


# output : 1
# mean only one different with -1 and other

1

In [33]:
diff.collect()



[Row(NEW_Product_ID='-1')]

In [35]:
# covert dataframe to table 
# table also use in HIVE

train.registerTempTable('train_table')

+--------+
|Purchase|
+--------+
|    8370|
|   15200|
|    1422|
|    1057|
|    7969|
|   15227|
|   19215|
|   15854|
|   15686|
|    7871|
+--------+
only showing top 10 rows



In [41]:

sqlContext.sql('select Purchase from train_table').show(5)

+--------+
|Purchase|
+--------+
|    8370|
|   15200|
|    1422|
|    1057|
|    7969|
+--------+
only showing top 5 rows



In [57]:
# more SQL
from pyspark.sql.functions import max

# select max purchase and groupby age
sqlContext.sql('select Age, avg(Purchase) from train_table group by Age').show()



+-----+-----------------------------+
|  Age|avg(CAST(Purchase AS DOUBLE))|
+-----+-----------------------------+
|18-25|            9169.663606261289|
|26-35|            9252.690632869888|
| 0-17|            8933.464640444974|
|46-50|            9208.625697468327|
|51-55|            9534.808030960236|
|36-45|            9331.350694917874|
|  55+|            9336.280459449405|
+-----+-----------------------------+

