In [1]:
import numpy as np
import pandas as pd
import pyspark
import datetime

import time
import calendar
import time


from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.functions import col,input_file_name
from pyspark.sql.types import IntegerType, DateType, LongType, TimestampType

In [2]:
# execute spark engine 
spark = (SparkSession
              .builder\
              .master("local[*]")\
              .appName("spark_performace")\
              .config("spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem")\
              .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.EnvironmentVariableCredentialsProvider")\
              .config("spark.debug.maxToStringFields", 2000)\
              .config("spark.sql.broadcastTimeout",  36000)\
              .config("spark.sql.shuffle.partitions", 128)\
              .config("spark.default.parallelism", 16)\
              .getOrCreate())

In [None]:
# read data from s3
sample_url = "s3a://aws-test-benny/assessment/target_user_sample/*.parquet"
basic_url = "s3a://aws-test-benny/assessment/basic/*.parquet"
exp_url = "s3a://aws-test-benny/assessment/exp_job/*.parquet"
cid_url = "s3a://aws-test-benny/assessment/cid_mapping/*.parquet"

sample_df= spark \
    .read \
    .parquet(sample_url)

basic_df= spark \
    .read \
    .parquet(basic_url)

exp_df = spark \
    .read \
    .parquet(exp_url)

cid_df = spark \
    .read \
    .parquet(cid_url)
    

#sample_df.printSchema()
#basic_df.printSchema()
#exp_df.printSchema()
#cid_df.printSchema()

In [3]:
# read data from database
url = "your db host"
dbname = "your db name "
driver = "com.mysql.cj.jdbc.Driver"
user = "your db user"
password = "your db password"

sample_dbtable = "target_user_sample"
basic_dbtable = "(SELECT id_no,update_date FROM basic) AS basic"
exp_dbtable = "(SELECT pkey, id_no, invoice, ind_cat_no, job_cat_no, start_date, end_date FROM exp_job) AS exp"
cid_dbtable = "(SELECT invoice, cid FROM cid_mapping) AS cid"

start_time = datetime.datetime.now()
sample_df= spark \
    .read \
    .format("jdbc") \
    .option("url", url) \
    .option("driver", driver) \
    .option("dbtable", sample_dbtable) \
    .option("user", user) \
    .option("password", password) \
    .option("numPartitions",9) \
    .option("partitionColumn","finish_date") \
    .option("lowerBound", "2012-01-01 00:00:00") \
    .option("upperBound","2019-01-01 00:00:00") \
    .option("timestampFormat", "yyyy-mm-dd hh:mm:ss") \
    .load()

basic_df= spark \
    .read \
    .format("jdbc") \
    .option("url", url) \
    .option("driver", driver) \
    .option("dbtable", basic_dbtable)\
    .option("user", user) \
    .option("password", password) \
    .option("numPartitions",9) \
    .option("fetchsize",500) \
    .load()

exp_df = spark \
    .read \
    .format("jdbc") \
    .option("url", url) \
    .option("driver", driver) \
    .option("dbtable",exp_dbtable) \
    .option("user", user) \
    .option("password", password) \
    .option("numPartitions", 8) \
    .option("partitionColumn","start_date") \
    .option("lowerBound", "1993-01-01 00:00:00") \
    .option("upperBound","2020-01-01 00:00:00") \
    .option("timestampFormat", "yyyy-mm-dd hh:mm:ss") \
    .load()

cid_df = spark \
    .read \
    .format("jdbc") \
    .option("url", url) \
    .option("driver", driver) \
    .option("dbtable", cid_dbtable) \
    .option("user", user) \
    .option("password", password) \
    .option("numPartitions", 8) \
    .load()

In [10]:
# exp JOIN basic ON idno JOIN sample_df ON idno
# job_cat_no>0 or ind_cat_no>0

join_df = exp_df \
            .where((f.col("job_cat_no")>0) | (f.col("ind_cat_no")>0))\
            .join(basic_df,["id_no"]) \
            .join(sample_df, exp_df.id_no == sample_df.idno,'inner')

In [11]:
# first_transform_df
# 取代 end_date
#
# groupby pkey 取代 difftime drop duplicates
# difftime = abs(((end_date-start_date)/2+start_date)-finish_date)
# sm=  (end_y-start_y)*12+(end_m-start_m)
# sy={
#      sm=0 , 2+log(0.5/3)/log(2);
#      o.w. ,  2+log(sm/3)/log(2)
#         }
# 先把邏輯做出來
replace_condition = (f.datediff(f.col("end_date"),f.col("start_date"))<0)
middle_date_diff = f.datediff(f.col("end_date"),f.col("start_date"))/2
update_date_diff = f.datediff(f.col("start_date"),f.col("finish_date"))
sm = (f.col("end_y") - f.col("start_y")) * 12 + (f.col("end_m") - f.col("start_m"))
sy_default = f.lit(2)+f.lit(f.log(f.lit(0.5) / 3) / f.log(f.lit(2)))
sy_by_sm = f.lit(2)+f.lit(f.log(f.lit(f.col("sm")) / 3) / f.log(f.lit(2)))

In [12]:
# apply 到 dataframe
tmp_df = join_df \
    .withColumn("end_date", \
                f.when(replace_condition, join_df["update_date"]) \
                 .otherwise(join_df["end_date"]) \
                      )\
    .withColumn("end_y", f.year(f.col("end_date")))\
    .withColumn("end_m", f.month(f.col("end_date"))) \
    .withColumn("start_y", f.year(f.col("start_date")))\
    .withColumn("start_m",f.month(f.col("start_date"))) \
    .withColumn("sm", f.when(sm>750, f.lit(750)).otherwise(sm)) \
    .withColumn("sy", f.when(f.col("sm")==0,sy_default).otherwise(sy_by_sm))

first_transform_df = tmp_df \
    .withColumn("diff_time", f.abs(middle_date_diff+update_date_diff)) \
    .orderBy(["pkey","diff_time"], ascending = True) \
    .coalesce(1) \
    .dropDuplicates(["pkey"])\
    .drop("pkey","finish_date","start_date","end_date","update_date","idno","end_y","end_m","start_y","start_m","diff_time")

In [13]:
# order by  cid select where cid[0]
# JOIN cid ON invoice
# fill cid = 0 when invoice == 0 or invoice is null

cid_select_df = cid_df \
                .select("cid","invoice") \
                .orderBy(["invoice","cid"], ascending = True) \
                .coalesce(1) \
                .dropDuplicates(["invoice"])

results = first_transform_df \
          .join(cid_select_df,["invoice"],"left").na.fill({'cid': 0}) \
          .withColumn("cid", f.when((f.col("invoice")==0) | (f.col("cid")==0) , f.lit(0))\
                    .otherwise(f.col("cid"))) 

# write output
results.write.format("parquet").mode("overwrite").save("s3a://aws-test-benny/sharon_inside/result.parquet")

#execution time
execution_time = datetime.datetime.now() - start_time

In [8]:
print(f"Spend time: {execution_time}")

Spend time: 0:05:07.085671


In [9]:
results.count()

2214509