# Chapter 2
## Download data and put it into hdfs

In [1]:
#!curl -L -o ./data/donation.zip http://bit.ly/1Aoywaq

## Import packages and create a session

In [2]:
import gc
import logging
import subprocess
from datetime import datetime
from pathlib import Path

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
# from pytz import timezone
# from pytz import utc

In [3]:
EXECUTOR_MEMORY = "2g"
EXECUTOR_CORES = 2
EXECUTORE_INSTANCES = 2
DRIVER_MEMORY = "2g"
DRIVER_MAX_RESULT_SIZE = "2g"

In [4]:
# Yarn resource manager
# spark = (
#     SparkSession.builder.appName(f"Chapter 2")
#     .master("yarn")
#     .config("spark.executor.memory", EXECUTOR_MEMORY)
#     .config("spark.executor.cores", EXECUTOR_CORES)
#     .config("spark.executor.instances", EXECUTORE_INSTANCES)
#     .config("spark.driver.memory", DRIVER_MEMORY)
#     .config("spark.driver.maxResultSize", DRIVER_MAX_RESULT_SIZE)
#     .config("spark.kryoserializer.buffer.max", "1024m")
#     .config("spark.sql.warehouse.dir", datapath)
#     .enableHiveSupport()
#     .getOrCreate()
# )

# spark.sparkContext.getConf().getAll()

# Standlone mode
spark = (
    SparkSession.builder.appName(f"Chapter 2")
    .master("local[*]")
    .config("spark.kryoserializer.buffer.max", "1024m")
#     .config("spark.sql.warehouse.dir", datapath)
    .enableHiveSupport()
    .getOrCreate()
)

spark.sparkContext.getConf().getAll()

[('spark.driver.port', '43751'),
 ('spark.driver.extraJavaOptions',
  '"-Dio.netty.tryReflectionSetAccessible=true"'),
 ('spark.kryoserializer.buffer.max', '1024m'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'Chapter 2'),
 ('spark.executor.extraJavaOptions',
  '"-Dio.netty.tryReflectionSetAccessible=true"'),
 ('spark.sql.catalogImplementation', 'hive'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1610558375734'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.host', 'a687cedb43ea'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

## Load data
### Spark Dataframe API

In [5]:
df_schema = T.StructType([
    T.StructField("id_1", T.StringType(), True),
    T.StructField("id_2", T.StringType(), True),
    T.StructField("cmp_fname_c1", T.DoubleType(), True),
    T.StructField("cmp_fname_c2", T.DoubleType(), True),
    T.StructField("cmp_lname_c1", T.DoubleType(), True),
    T.StructField("cmp_lname_c2", T.DoubleType(), True),
    T.StructField("cmp_sex", T.ShortType(), True),
    T.StructField("cmp_bd", T.ShortType(), True),
    T.StructField("cmp_bm", T.ShortType(), True),
    T.StructField("cmp_by", T.ShortType(), True),
    T.StructField("cmp_plz", T.ShortType(), True),
    T.StructField("is_match", T.BooleanType(), True)
])

In [6]:
%time (\
    spark\
    .read.format("csv")\
    .option("header", True)\
    .option("nullValue", "?")\
    .option("inferSchema", True)\
    .schema(df_schema)\
    .load("/home/jovyan/work/data/ch02")\
).show()

spark_df = (
    spark
    .read.format("csv")
    .option("header", True)
    .option("nullValue", "?")
    .schema(df_schema)
    .load("/home/jovyan/work/data/ch02")
)

+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| 3148| 8326|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|14055|94934|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|33948|34740|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|  946|71870|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|64880|71676|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|25739|45991|         1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|  

In [7]:
spark_df.printSchema()

root
 |-- id_1: string (nullable = true)
 |-- id_2: string (nullable = true)
 |-- cmp_fname_c1: double (nullable = true)
 |-- cmp_fname_c2: double (nullable = true)
 |-- cmp_lname_c1: double (nullable = true)
 |-- cmp_lname_c2: double (nullable = true)
 |-- cmp_sex: short (nullable = true)
 |-- cmp_bd: short (nullable = true)
 |-- cmp_bm: short (nullable = true)
 |-- cmp_by: short (nullable = true)
 |-- cmp_plz: short (nullable = true)
 |-- is_match: boolean (nullable = true)



### Pandas Dataframe

In [8]:
import pandas as pd
import numpy as np

file_path = "/home/jovyan/work/data/ch02"
files = Path(file_path).glob("b*.csv")
%time pandas_df = pd.concat([pd.read_csv(f, header='infer') for f in files])
%time pandas_df = pandas_df.replace({"?":None})

CPU times: user 5.53 s, sys: 635 ms, total: 6.17 s
Wall time: 6.17 s
CPU times: user 9.51 s, sys: 1.46 s, total: 11 s
Wall time: 11 s


In [9]:
pandas_df["id_1"] = pandas_df["id_1"].astype(np.int64, errors="ignore")
pandas_df["id_2"] = pandas_df["id_2"].astype(np.int64, errors="ignore")
pandas_df["cmp_fname_c1"] = pandas_df["cmp_fname_c1"].astype(np.float64, errors="ignore")
pandas_df["cmp_fname_c2"] = pandas_df["cmp_fname_c2"].astype(np.float64, errors="ignore")
pandas_df["cmp_lname_c1"] = pandas_df["cmp_lname_c1"].astype(np.float64, errors="ignore")
pandas_df["cmp_lname_c2"] = pandas_df["cmp_lname_c2"].astype(np.float64, errors="ignore")
pandas_df["cmp_sex"] = pandas_df["cmp_sex"].astype(np.int64, errors="ignore")
pandas_df["cmp_bd"] = pandas_df["cmp_bd"].astype(np.int64, errors="ignore")
pandas_df["cmp_by"] = pandas_df["cmp_by"].astype(np.int64, errors="ignore")
pandas_df["cmp_plz"] = pandas_df["cmp_plz"].astype(np.int64, errors="ignore")
pandas_df["is_match"] = pandas_df["is_match"].astype(np.bool, errors="ignore")

In [10]:
info = {}
info.update({"name" : pandas_df.columns})
info.update({"type" : pandas_df.dtypes})
info.update({"null_cnt" : [sum(pandas_df[col].isnull()) for col in pandas_df.columns]})

In [11]:
pd.DataFrame(info)

Unnamed: 0,name,type,null_cnt
id_1,id_1,int64,0
id_2,id_2,int64,0
cmp_fname_c1,cmp_fname_c1,float64,1007
cmp_fname_c2,cmp_fname_c2,float64,5645434
cmp_lname_c1,cmp_lname_c1,float64,0
cmp_lname_c2,cmp_lname_c2,float64,5746668
cmp_sex,cmp_sex,int64,0
cmp_bd,cmp_bd,object,795
cmp_bm,cmp_bm,object,795
cmp_by,cmp_by,object,795


### Using a cache
* 저장 레벨 
    * StorageLevel.MEMORY
    * StorageLevel.MEMORY_SER
    * StorageLevel.MEMORY_AND_DISK
    * StorageLevel.MEMORY_AND_DISK_SER

## Transformation
### 다양한 집계함수

In [12]:
spark_df.rdd.map(lambda x:x["is_match"]).countByValue()

defaultdict(int, {True: 20931, False: 5728201, None: 81})

In [13]:
(
    spark_df.groupBy("is_match")
    .count()
    .orderBy(F.col("count").desc())
).show()

+--------+-------+
|is_match|  count|
+--------+-------+
|   false|5728201|
|    true|  20931|
|    null|     81|
+--------+-------+



In [14]:
(
    spark_df.groupBy("is_match")
    .agg(
        F.count(F.col("is_match")).alias("COUNT"),
        F.countDistinct(F.col("is_match")).alias("DIST_COUNT"),
        F.sum(F.when((F.col("is_match") == "true"), 1).otherwise(0)).alias("TRUE_CNT"),
        F.sum(F.when((F.col("is_match") == "false"), 1).otherwise(0)).alias("FALSE_CNT"),
    )
).show()

+--------+-------+----------+--------+---------+
|is_match|  COUNT|DIST_COUNT|TRUE_CNT|FALSE_CNT|
+--------+-------+----------+--------+---------+
|    null|      0|         0|       0|        0|
|    true|  20931|         1|   20931|        0|
|   false|5728201|         1|       0|  5728201|
+--------+-------+----------+--------+---------+



### 요약 통계

In [15]:
summary = spark_df.describe()

In [16]:
spark_df.where(F.col("is_match") == "true").describe().show()
spark_df.where(F.col("is_match") == "false").describe().show()

matched_summary = spark_df.where(F.col("is_match") == "true").describe()
miss_summary = spark_df.where(F.col("is_match") == "false").describe()

+-------+-----------------+-----------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+
|summary|             id_1|             id_2|       cmp_fname_c1|       cmp_fname_c2|        cmp_lname_c1|       cmp_lname_c2|            cmp_sex|             cmp_bd|              cmp_bm|             cmp_by|            cmp_plz|
+-------+-----------------+-----------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+
|  count|            20931|            20931|              20922|               1333|               20931|                475|              20931|              20925|               20925|              20925|              20902|
|   mean|34575.72117911232|51259.95939037791| 0.9973163859635038| 0.9898900320318176|  0

### 테이블 축 전환

In [17]:
metrics = summary.toPandas().iloc[:, 0]
feature_names = summary.toPandas().columns[1:]

In [18]:
agg_functions = [
    f"F.{metric}('{name}').alias('{name}_{metric}')"
    for name in feature_names
    for metric in metrics
]
agg_functions = ",".join(agg_functions)

In [19]:
agg_functions

"F.count('id_1').alias('id_1_count'),F.mean('id_1').alias('id_1_mean'),F.stddev('id_1').alias('id_1_stddev'),F.min('id_1').alias('id_1_min'),F.max('id_1').alias('id_1_max'),F.count('id_2').alias('id_2_count'),F.mean('id_2').alias('id_2_mean'),F.stddev('id_2').alias('id_2_stddev'),F.min('id_2').alias('id_2_min'),F.max('id_2').alias('id_2_max'),F.count('cmp_fname_c1').alias('cmp_fname_c1_count'),F.mean('cmp_fname_c1').alias('cmp_fname_c1_mean'),F.stddev('cmp_fname_c1').alias('cmp_fname_c1_stddev'),F.min('cmp_fname_c1').alias('cmp_fname_c1_min'),F.max('cmp_fname_c1').alias('cmp_fname_c1_max'),F.count('cmp_fname_c2').alias('cmp_fname_c2_count'),F.mean('cmp_fname_c2').alias('cmp_fname_c2_mean'),F.stddev('cmp_fname_c2').alias('cmp_fname_c2_stddev'),F.min('cmp_fname_c2').alias('cmp_fname_c2_min'),F.max('cmp_fname_c2').alias('cmp_fname_c2_max'),F.count('cmp_lname_c1').alias('cmp_lname_c1_count'),F.mean('cmp_lname_c1').alias('cmp_lname_c1_mean'),F.stddev('cmp_lname_c1').alias('cmp_lname_c1_stdd

In [20]:
result = eval(f"spark_df.groupBy('is_match').agg({agg_functions}).toPandas().set_index('is_match')")
resultT = result.T
resultT = resultT.reset_index()

In [21]:
# spark transposeDF 참고
# def TransposeDF(df, columns, pivotCol):
#     columnsValue = list(map(lambda x: str("'") + str(x) + str("',")  + str(x), columns))
#     stackCols = ','.join(x for x in columnsValue)
#     df_1 = (
#         df
#         .selectExpr(pivotCol, "stack(" + str(len(columns)) + "," + stackCols + ")")
#         .select(pivotCol, "col0", "col1")
#     )
#     final_df = (
#         df_1.groupBy(col("col0")).pivot(pivotCol).agg(concat_ws("", collect_list(col("col1"))))
#         .withColumnRenamed("col0", pivotCol)
#     )
#     return final_df

### Selecting features

In [22]:
resultT["matrix"] = resultT["index"].apply(lambda s : s.split("_")[-1])
resultT["field"] = resultT["index"].apply(lambda s : "_".join(s.split("_")[:-1]))

In [23]:
resultT

is_match,index,NaN,True,False,matrix,field
0,id_1_count,81,20931.0,5728201.0,count,id_1
1,id_1_mean,0.000235405,34575.7,33319.9,mean,id_1
2,id_1_stddev,,21950.3,23665.8,stddev,id_1
3,id_1_min,1 goal field),10001.0,1.0,min,id_1
4,id_1_max,9. Class Distribution: 20.931 matches,99946.0,9999.0,max,id_1
5,id_2_count,26,20931.0,5728201.0,count,id_2
6,id_2_mean,0.0014771,51260.0,66643.4,mean,id_2
7,id_2_stddev,,24345.7,23599.6,stddev,id_2
8,id_2_min,2 non-predictive,10010.0,10000.0,min,id_2
9,id_2_max,0.00147710487444609,99996.0,99999.0,max,id_2
