In [1]:
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession, SQLContext
import pyspark.sql.functions as F
from time import time
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from sklearn.metrics import roc_curve, auc, precision_recall_curve, average_precision_score
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import utility
%load_ext blackcellmagic


ModuleNotFoundError: No module named 'pyspark'

In [None]:
# initialise sparkContext
spark = (
    SparkSession.builder.master("local[*]")
    .appName("IQreqTypeSuggester")
    .config("spark.driver.memory", "16g")
    .config("spark.driver.extraJavaOptions", "-Xss10m")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.cores.max", "6")
    .getOrCreate()
)

sc = spark.sparkContext
sc.setCheckpointDir("checkpoint/")
sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.parquet.binaryAsString", "true")
spark.version


## Read Data

In [None]:
data_dir = "data/"
start_date = "2018-11-01"
end_date = "2019-02-01"
name = ""
data_path = data_dir + name + "." + start_date + "." + end_date + ".parquet"
df_name_1 = spark.read.parquet(data_path).persist()


#gain of this repartition, or in other words, what can go bad w/o repartition
#df.repartition(400).write.mode("overwrite").parquet(data_dir + start_date + '_' + end_date + '-repartitioned.parquet')

df_name_1 = df_name_1.drop_duplicates(subset=["A","B"])


### clean and explore data
**by row**: 
* check and drop na
* drop duplicates

**by column**: 
* number of unique values for categorical column (pay attention to empty string and None for string valued column) 
* summary statistics for numerical column

In [None]:
df_name_1.printSchema()
df_name_1.info()
cols_to_describe = ["A","B","C"]
df_name_1.select(*cols_to_describe).describe().toPandas()


In [None]:
#potential groupby
group_key = ['A','B']
aggregation = {'C':'mean', 
               'D':'sum', 
               'E':'count'}

df_name_1 = df_name_1.groupby(group_key).agg(aggregation).reset_index()


In [None]:
df_name_1.createOrReplaceTempView("df_name_1")


## Read Data_2

In [None]:
data_dir = "data/"
start_date = "2018-11-01"
end_date = "2019-02-01"
name = ""
data_path = data_dir + name + "." + start_date + "." + end_date + ".parquet"
df_name_2 = spark.read.parquet(data_path).persist()

#gain of this repartition, or in other words, what can go bad w/o repartition
#df.repartition(400).write.mode("overwrite").parquet(data_dir + start_date + '_' + end_date + '-repartitioned.parquet')

df_name_2 = df_name_2.drop_duplicates(subset=["A","B"])


In [None]:
df_name_2.printSchema()
cols_to_describe = ["A","B","C"]
df_name_2.select(*cols_to_describe).describe().toPandas()


In [None]:
#potential groupby
group_key = ['A','B']
aggregation = {'C':'mean', 
               'D':'sum', 
               'E':'count'}

df_name_2 = df_name_2.groupby(group_key).agg(aggregation).reset_index()


In [None]:
df_name_2.createOrReplaceTempView("df_name_2")


## Join Data

In [None]:
df = sqlContext.sql("""
  SELECT a.*, b.z as z
  FROM df_1 a left join df_2 b on (a.x = b.x) and (a.y = b.y) 
  """).persist()


In [None]:
na_fill_scheme = {"A": 0, "B": 1}
df = df.na.fill(na_fill_scheme)

In [None]:
#potential groupby
group_key = ['A','B']
aggregation = {'C':'mean', 
               'D':'sum', 
               'E':'count'}

df = df.groupby(group_key).agg(aggregation).reset_index()


In [None]:
df.printSchema()
df.info()
cols_to_describe = ["A","B","C"]
df.select(*cols_to_describe).describe().toPandas()


In [None]:
cols_to_select = ["A","B"]
df = df.select(*cols_to_select).drop_duplicates().persist()


In [None]:
df.createOrReplaceTempView("df")

# Appendix: standard operations

## column operations

### Create new column based on existing ones

In [None]:
df = df.withColumn('A', (1-F.col('B'))*(1-F.col('C')))
df = df.withColumn('D', F.lit(1))
df.printSchema()

In [None]:
df = df.withColumnRenamed("old_name", "new_name")

In [None]:
df = df.withColumn("new_col", (df.A>0).cast("integer"))
df = df.withColumn("new_col", ((df.A-df.B)>0).cast("integer"))


### drop column

In [None]:
df = df.drop("A")

### sort according to one column

In [None]:
df.sort(F.desc("A"))

### filter to create subset

In [None]:
df_sub = df.filter(df.A.isin(["value1", "value2"]) | df.B.isin(['value1','value2']))

In [None]:
df_sub = df.filter(~df.A.isin(["value1", "value2"]) & ~df.B.isin(['value1','value2']))

In [None]:
df_sub = df.filter(F.col('A')>0)

In [None]:
df_sub = df.where(df.A==1)


### count distinct values of each column

In [None]:
df.agg(*(F.countDistinct(F.col(c)).alias(c) for c in df.columns)).toPandas()

### UDF

In [None]:
def fun(label, posPercentage):
    weight = 1/(posPercentage * 2) if label==1 else 1/((1-posPercentage)*2)
    
    return weight


In [None]:
## use udf to create new column out of existing columns
from pyspark.sql.types import *
udf_name = F.udf(fun, returnType = FloatType())
df = df.withColumn("A", udf_name('B', F.lit(0.5)))


In [None]:
def rosetta_category(attributes, category):
    '''
    functionality: given attributes, to check if it has any attribute in the category
    attributes: one long string containing multiple rosetta attributes
    category: the category of rosetta attribute
    '''
    try:
        for attribute in attributes:
            try: 
                if attribute[:len(category)] == category:
                    return 1
            except AttributeError:
                continue
        return 0
    except TypeError:
        return 0
    

In [None]:
df = df.withColumn("verifiedExtractedAttributeList", F.split("verifiedExtractedAttribute", ","))

#convert a function into udf
udf_rosetta_category = F.udf(rosetta_category, returnType = IntegerType())
df = df.withColumn("skill", udf_rosetta_category("verifiedExtractedAttributeList", F.lit("skill")))

#UDF is user-defined function to transform each row of a column, and it is like lambda function in python
rosetta_categories = ['skill', 'education', 'language', 'license'] 
for category in rosetta_categories:
    df = df.withColumn(category, udf_rosetta_category("verifiedExtractedAttributeList", F.lit(category)))

In [None]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

maturity_udf = udf(lambda age: "adult" if age >=18 else "child", StringType())

df = sqlContext.createDataFrame([{'name': 'Alice', 'age': 1}])
df.withColumn("maturity", maturity_udf(df.age))


### Save

In [None]:
df.toPandas().to_csv("name.csv", header=True, index=False, encoding='utf-8')
