In [1]:
import os
os.environ["SPARK_HOME"] = '/usr/share/spark'
os.environ["PYSPARK_PYTHON"] = '/usr/share/miniconda2/envs/py36/bin/python'
# os.environ["PYSPARK_DRIVER_PYTHON"] = '/ldap_home/meng.hu/anaconda3/envs/python36/bin/python'
os.environ["PYSPARK_DRIVER_PYTHON"] = '/usr/share/miniconda2/envs/py36/bin/python'
from datetime import datetime, timedelta
from os.path import basename
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Row, SQLContext
import pyspark.sql.functions as sqlf
import pyspark.sql.types as sqlt
import pandas as pd

In [2]:
# spark init
name = 'fdy_spark_basics_demo'
port = 30011
SPARK_SERVICE = None

SPARK_CONF = SparkConf().set('spark.locality.wait', '1000ms') \
    .set('spark.executor.instances', '80') \
    .set('spark.executor.cores', '2') \
    .set("spark.sql.shuffle.partitions", '400') \
    .set('spark.default.parallelism', '400') \
    .set('spark.executor.memory', '5g') \
    .set('spark.ui.port', port) \
    .set('spark.yarn.queue', 'dev') \
    .set('spark.sql.session.timeZone', 'GMT+7') \
    .set('spark.network.timeout', '500') \
    .set('spark.sql.execution.arrow.enable','true') \
    .set("spark.sql.execution.arrow.fallback.enabled", "true") \

sc = SparkContext(SPARK_SERVICE, appName=name, conf=SPARK_CONF)
sqlContext = SQLContext(sc)
spark = SparkSession.builder \
    .enableHiveSupport() \
    .getOrCreate()

## 1. Read and write files

In [None]:
# csv
# from hdfs dir
spark_df = spark.read.format("csv").option("header", "true") \
        .load("hdfs:///projects/regds_fdy/fdy_features_logmerchant_feature2020-03-30.csv")
# from local file
pandas_df = pd.read_csv('/ldap_home/meng.hu/notebooks/accurary/test_merchant.csv')
spark_df = spark.createDataFrame(pandas_df)

# write to csv
# to hadoop dir
spark_df.repartition(1) \
    .write.format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save("hdfs:///projects/regds_fdy/demo_test/demo_shipper.csv")
# to local file
pandas_df = spark_df.toPandas()
pandas_df.to_csv('out_testshipper.csv', index=None, header=True)

In [None]:
# to json or json array
time_from = '2020-05-20 00:00:00'
time_to ='2020-05-21 00:00:00'
REQUEST_LOG_TAB = 'foody.foody_partner_db__order_assign_shipper_batch_processing_log_tab'
sql =  '''
        select id, city_id, processing_time, grass_date as date, create_time, processing_info, sharding 
        from {} 
        where from_unixtime(create_time) between '{}' and '{}'
        and city_id = 217
       '''.format(REQUEST_LOG_TAB, time_from, time_to)
spark_df = spark.sql(sql)
spark_df.cache()

# to json
import json
data_info=json.loads(spark_df.collect()[0][5])
with open('demo_one_request.json', 'w') as f:
    json.dump(data_info, f)
    
# to json array
pandas_df = spark_df.toPandas()

# get a list of dict
request_list = []       
for idx, row in pandas_df.iterrows():
    try:
        info = json.loads(row['processing_info'])
        request = info['ds_request']
        if request != None:
            request_list.append(request)
    except Exception as e:
        continue 

# dump the list to a json array
first_item = True
with open('demo_json_array.json', 'w') as out:
    out.write('[')
    for item in res:
        if first_item:
            out.write(json.dumps(item))
            first_item = False
        else:
            out.write("," + json.dumps(item))            
    out.write(']')

In [None]:
# write to parquet files
spark_df.write.parquet(hdfs_path,mode='overwrite')
# read from parquet files
spark_df=spark.read.parquet('hdfs:///projects/regds_fdy/foody_acceptance/shipper_features/shipper_test.parquet')

In [None]:
# hive
# demo: save hive table by day
def create_hive_table_by_day(df, parquet_loc, table_name, grass_date):

    """
    create hive table
    """
    # drop table if it is already exists.
    # spark.sql("drop table if exists {table_name}".format(table_name=table_name))

    df.createOrReplaceTempView('temp')
    columns = ', '.join(spark.sql('describe temp').rdd.map(lambda row: row[0] + ' ' + row[1]).collect())
    sql = '''
        create external table if not exists {table_name}
        (
            {columns}
        )
        partitioned by (grass_date string)
        stored as PARQUET
        location '{pqfile}'
    '''.format(table_name=table_name, columns=columns, pqfile=parquet_loc)
    spark.sql(sql)

    # drop partitions
    sql = '''
    ALTER TABLE {table_name} DROP IF EXISTS 
    PARTITION  (grass_date='{grass_date}')
    '''.format(grass_date=grass_date, table_name=table_name)
    spark.sql(sql)

    # add partition
    pqfile_path = parquet_loc + "/grass_date={}".format(grass_date)

    sql = '''
    ALTER TABLE {table_name} ADD IF NOT EXISTS
          PARTITION  (grass_date='{grass_date}')
          LOCATION '{pqfile}'
    '''.format(grass_date=grass_date, table_name=table_name, pqfile=pqfile_path)
    spark.sql(sql)
    print("Insert new data to Table %s with partition (grass_date: {})".format(grass_date))
    
request_log_df.write.mode("overwrite").parquet(cur_hdfs_path + partition)
create_hive_table_by_day(request_log_df, cur_hdfs_path, table_name, time_from)

## 3.1 Common RDD Operators 

In [31]:
# map
rdd = sc.parallelize(list(range(0, 10)), 3)
def map_func(x):
    return 2*x
rdd1 = rdd.map(map_func)
rdd1.take(10)

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

In [29]:
# mapPartitions
def mappartition_func(part):
    for x in part:
        res = x*2
        yield res
        
rdd2 = rdd.mapPartitions(mappartition_func)
rdd2.take(10)

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

In [35]:
# mapPartitionsWithIndex
def mappartitionswithindex_func(partitionindex, part):
    for x in part:
        res = x*2
        yield (partitionindex,res)
rdd3 = rdd.mapPartitionsWithIndex(mappartitionswithindex_func)
rdd3.take(10)    

[(0, 0),
 (0, 2),
 (0, 4),
 (1, 6),
 (1, 8),
 (1, 10),
 (2, 12),
 (2, 14),
 (2, 16),
 (2, 18)]

In [49]:
# flatMap
rdd = sc.parallelize([1, 2, 3, 4], 3)
rdd4 = rdd.flatMap(lambda x: (x, x))
rdd4.take(10)

[1, 1, 2, 2, 3, 3, 4, 4]

In [55]:
rdd4.getNumPartitions()

3

In [58]:
# glom
res = rdd4.glom()
def glom_map(x):
    return len(x)
rdd5 = res.map(glom_map)
rdd5.take(3)

[2, 2, 4]

In [116]:
# groupBy
rdd = sc.parallelize([1, 2, 3, 4], 3)
def groupby_func(x):
    return x % 2 == 0
grouped = rdd.groupBy(groupby_func) # grouped.getNumPartitions() = 400
grouped_list = grouped.map(lambda x: (list(x[1])))
grouped_list.map(lambda x: max(x)).collect()

[3, 4]

In [68]:
# distinct
rdd = sc.parallelize([5, 1, 2, 3, 4, 4, 5], 3)
rdd.distinct().collect()

[1, 2, 3, 4, 5]

In [69]:
# coalesce
rdd = sc.parallelize(list(range(0, 9)), 3)
# reduce 3 partitions to 2 by combining last two together
rdd.coalesce(2) 

CoalescedRDD[150] at coalesce at NativeMethodAccessorImpl.java:0

In [None]:
# repartitionAndSortWithinPartitions, now change partition number to 200
rdd.repartitionAndSortWithinPartitions(200)

In [26]:
# create df by rdd
rdd = sc.parallelize([(1,'Alice', 18),(2,'Andy', 19),(3,'Bob', 17),(4,'Justin', 21),(5,'Cindy', 20)])
schema = sqlt.StructType([
    sqlt.StructField("id", sqlt.IntegerType(), True),
    sqlt.StructField("name", sqlt.StringType(), True),
    sqlt.StructField("age", sqlt.IntegerType(), True)
])
spark_df = rdd.toDF(schema=schema)
spark_df.show()

+---+------+---+
| id|  name|age|
+---+------+---+
|  1| Alice| 18|
|  2|  Andy| 19|
|  3|   Bob| 17|
|  4|Justin| 21|
|  5| Cindy| 20|
+---+------+---+



## 3.2 Common DF Operations

In [12]:
# get the df
time_from = '2020-05-30 00:00:00'
time_to ='2020-06-02 00:00:00'
REQUEST_LOG_TAB = 'foody.foody_partner_db__order_assign_shipper_batch_processing_log_tab'
sql =  '''
        select id, city_id, processing_time, grass_date as date, create_time, processing_info, sharding 
        from {} 
        where from_unixtime(create_time) between '{}' and '{}'
        and city_id = 217
       '''.format(REQUEST_LOG_TAB, time_from, time_to)
spark_df = spark.sql(sql)
spark_df.cache()

DataFrame[id: decimal(20,0), city_id: bigint, processing_time: bigint, date: string, create_time: bigint, processing_info: string, sharding: string]

### Action

In [None]:
spark_df.count()
spark_df.collect()[0]['id'] 
spark_df.describe('date','create_time').show()
spark_df.first() # first row
spark_df.head() # first row, spark_df.head(2) # first two rows
spark_df.show() # return in df format

### Basics and Transformations

In [None]:
spark_df.columns # return a list of column names
spark_df.dtypes # return a list of tuples of (column name, data type)
spark_df.printSchema() # print column names and types in tree format
spark_df.schema # return column name and type in structType format
spark_df.createOrReplaceTempView('demo_df') # create temporary table later can qurey from

spark_df.cache() 
spark_df.unpersist

spark_df.groupBy() 
spark_df.agg()
spark_df.filter()
spark_df.orderBy()
spark_df.sample()

spark_df.dropDuplicates()
spark_df.na.drop() 
spark_df.dropna(subset=[])

spark_df.select()
spark_df.withColumnRenamed()
spark_df.withColumn()
spark_df.drop()
spark_df.explode()

spark_df.join()
spark_df.union()
spark_df.unionByName()
spark_df.subtract()
spark_df.intersect()

In [107]:
spark_df.columns

['id',
 'city_id',
 'processing_time',
 'date',
 'create_time',
 'processing_info',
 'sharding']

In [43]:
spark_df.describe('date','create_time').show()

+-------+----------+--------------------+
|summary|      date|         create_time|
+-------+----------+--------------------+
|  count|     93641|               93641|
|   mean|      null|1.5906830754114437E9|
| stddev|      null|   50898.43077166099|
|    min|2020-05-28|          1590598800|
|    max|2020-05-29|          1590771599|
+-------+----------+--------------------+



In [109]:
spark_df.printSchema()

root
 |-- id: decimal(20,0) (nullable = true)
 |-- city_id: long (nullable = true)
 |-- processing_time: long (nullable = true)
 |-- date: string (nullable = true)
 |-- create_time: long (nullable = true)
 |-- processing_info: string (nullable = true)
 |-- sharding: string (nullable = true)



In [110]:
spark_df.schema

StructType(List(StructField(id,DecimalType(20,0),true),StructField(city_id,LongType,true),StructField(processing_time,LongType,true),StructField(date,StringType,true),StructField(create_time,LongType,true),StructField(processing_info,StringType,true),StructField(sharding,StringType,true)))

In [114]:
spark_df.createOrReplaceTempView('demo_df') # create temporary table later can qurey from
spark.sql('select date, avg(processing_time) from demo_df where city_id = 217 group by date').show()

+----------+--------------------+
|      date|avg(processing_time)|
+----------+--------------------+
|2020-05-28|    738.388597067192|
|2020-05-29|   709.9667785864737|
+----------+--------------------+



In [115]:
#spark_df.agg(sqlf.min('processing_time'), sqlf.max('date')).show()
# avg, max, min, sum, count
spark_df.groupBy('date').agg(sqlf.min('processing_time'), sqlf.max('processing_time')).show()

+----------+--------------------+--------------------+
|      date|min(processing_time)|max(processing_time)|
+----------+--------------------+--------------------+
|2020-05-28|                  64|               13473|
|2020-05-29|                  76|               51463|
+----------+--------------------+--------------------+



duplicate, distinct values and fill na 

In [128]:
# drop duplicates based on columns
spark_df.dropDuplicates(['city_id','id'])
# count of distinct values in each column
spark_df.agg(*(sqlf.countDistinct(sqlf.col(c)).alias(c) for c in spark_df.columns)).show()

+-----+-------+---------------+----+-----------+---------------+--------+
|   id|city_id|processing_time|date|create_time|processing_info|sharding|
+-----+-------+---------------+----+-----------+---------------+--------+
|93641|      1|           3551|   2|      93603|          93641|       1|
+-----+-------+---------------+----+-----------+---------------+--------+



In [129]:
# percentage of missing value in each column
spark_df.agg(*[(1-(sqlf.count(c) /sqlf.count('*'))).alias(c+'_missing') for c in spark_df.columns]).show()

+----------+---------------+-----------------------+------------+-------------------+-----------------------+----------------+
|id_missing|city_id_missing|processing_time_missing|date_missing|create_time_missing|processing_info_missing|sharding_missing|
+----------+---------------+-----------------------+------------+-------------------+-----------------------+----------------+
|       0.0|            0.0|                    0.0|         0.0|                0.0|                    0.0|             0.0|
+----------+---------------+-----------------------+------------+-------------------+-----------------------+----------------+



In [None]:
# filter
spark_df.filter((spark_df['city_id']==217) & (spark_df['date']=='2020-05-28'))
# orderBy
spark_df.orderBy(['processing_time','date'], ascending=[True, False]).show()
# sample
tmp = spark_df.sample(False, 0.2, 42) # (withReplacement, fraction, random seed)

In [130]:
# drop na
spark_df.na.drop() # drop all rows that contain na
spark_df.dropna(subset=['id', 'city_id']) # drop rows based on na from subset of columns

# fill na
# fill na with 0 or any other value
spark_df = spark_df.fillna(0, subset=['city_id', 'create_time'])

# fill na with column mean or median
def fillna_with_mean(df, include=set()): # which columns to include
    mean_dict = dict()
    for c in include:
        mean_dict[c] = df.select(sqlf.mean(df[str(c)])).collect()[0][0]
    return df.na.fill(mean_dict)


def fillna_with_median(df, include=set()):
    median_dict = dict()
    for c in include:
        median_dict[c] = df.stat.approxQuantile(c, [0.5], 0.001)[0]
    return df.na.fill(median_dict)


def fillna_with_mean_exclude(df, exclude=set()): # which columns to exclude
    mean_dict = dict()
    for c in df.columns:
        if c not in exclude:
            mean_dict[c] = df.select(sqlf.mean(df[str(c)])).collect()[0][0]
    return df.na.fill(mean_dict)


def fillna_with_median_exclude(df, exclude=set()):
    median_dict = dict()
    for c in df.columns:
        if c not in exclude:
            median_dict[c] = df.stat.approxQuantile(c, [0.5], 0.001)[0]
    return df.na.fill(median_dict)

spark_df = fillna_with_median(spark_df, ['city_id','processing_time'])

In [137]:
# select (select columns)
spark_df.select('date','processing_time'+1)
# withColumnRenamed (rename a column)
spark_df.withColumnRenamed('date','date_new')
# withColumn, can add column (can write udf to add column), can used to change column type
# my udf
import json
def extract_num_order(data):
    num = 0
    data_dict = json.loads(str(data))
    if type(data_dict) == dict:
        if data_dict["ds_request"] != None:
            num = len(data_dict["ds_request"]["order_shippers"])
    return num

# common return type: LongType(), StringType(), IntegerType(), FloatType() 
# careful: default is String, if don't pass the second parameter
extract_num_order_udf = sqlf.udf(extract_num_order, sqlt.IntegerType()) # register udf

spark_df = spark_df.withColumn("num_order", extract_num_order_udf(spark_df['processing_info']))
# can also used to change column type (demo: change to StringType)
spark_df = spark_df.withColumn('processing_time', sqlf.col('processing_time').cast(sqlt.StringType()))
# spark_df = spark_df.withColumn('complete_datetime', sqlf.to_timestamp(sqlf.col('create_time')))

# drop (drop columns)
spark_df.drop('city_id','date')

# join/subtract/union of dataframes
# join: the join key of two df cannot have same names (rename first)
# join type: inner(default), outer, left, right
joined_df = df1.join(df2, (df1.res_id==df2.res_id1) & (df1.cid==df2.cid2), "left")
# union, unionByName
tmp = df1.union(df2) # just add rows togeter, doesn't consider column order
tmp = df1.unionByName(df2) # use when the order of columns of DFs are different
# substract
tmp = df1.subtract(df2) # return rows in df1 but not in df2
# intersect
tmp = df1.intersect(df2) # return rows in both df1 and df2

DataFrame[id: decimal(20,0), city_id: bigint, processing_time: bigint, date: string, create_time: bigint, processing_info: string, sharding: string]

## 4. Pyarrow and pandas_udf

In [4]:
# case 1 udf
import time
def which_day(data):
    if data == '2020-05-28':
        return 1
    else:
        return 2
which_day_udf = sqlf.udf(which_day, sqlt.IntegerType())

start = time.time()
spark_df = spark_df.withColumn("demo1", which_day_udf(spark_df['date']))
spark_df.count()
end = time.time()
print(end-start)        

25.261234998703003


In [6]:
# case 1 pandas_udf
from pyspark.sql.functions import pandas_udf
def which_day_pd(data):
    # the difference with normal udf is that you need to return a pd.series
    return pd.Series([1 if a == '2020-05-28' else 2 for a in data])
which_day_pd_udf = pandas_udf(which_day_pd, returnType=sqlt.IntegerType()) 

start = time.time()
spark_df = spark_df.withColumn("demo2", which_day_pd_udf(spark_df['date']))
spark_df.count()
end = time.time()
print(end-start)  

0.7254760265350342


In [7]:
# case2 udf
import json
def extract_num_order(data):
    num = 0
    data_dict = json.loads(str(data))
    if type(data_dict) == dict:
        if data_dict["ds_request"] != None:
            num = len(data_dict["ds_request"]["order_shippers"])
    return num
extract_num_order_udf = sqlf.udf(extract_num_order, sqlt.IntegerType())

start = time.time()
spark_df = spark_df.withColumn("demo4", extract_num_order_udf(spark_df['processing_info']))
spark_df.count()
end = time.time()
print(end-start) 

13.077404737472534


In [8]:
# case 2 pandas_udf
def extract_num_order_pd(data):
    res = []
    for a in data:
        num = 0
        data_dict = json.loads(str(a))
        if type(data_dict) == dict:
            if data_dict["ds_request"] != None:
                num = len(data_dict["ds_request"]["order_shippers"])
        res.append(num)
    return pd.Series(res)
extract_num_order_pd_udf = pandas_udf(extract_num_order_pd, returnType=sqlt.IntegerType()) 

start = time.time()
spark_df = spark_df.withColumn("demo5", extract_num_order_pd_udf(spark_df['processing_info']))
spark_df.count()
end = time.time()
print(end-start) 

7.39384126663208
