In [1]:
import findspark
findspark.init() 

In [2]:
import os
import time
import datetime
import pyspark.sql.functions as sf
from uuid import *
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import when
from pyspark.sql.functions import col
from pyspark.sql.types import *
from pyspark.sql.functions import lit
from pyspark import SparkConf, SparkContext
from uuid import *
from uuid import UUID
import time_uuid
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.window import Window as W

In [3]:
spark = SparkSession.builder.config("spark.jars.packages",'com.datastax.spark:spark-cassandra-connector_2.12:3.1.0').getOrCreate()


In [11]:
#ETL data

def process_timeuuid(df):
    spark_time = df.select('create_time').collect()
    normal_timestamp = []
    for i in range(len(spark_time)):
        normal_timestamp.append(str(time_uuid.TimeUUID(bytes=UUID(spark_time[i][0]).bytes).get_datetime().strftime('%Y-%m-%d %H:%M:%S')))
    normal_timestamp
    spark_timeuuid = []
    for i in range(len(spark_time)):
        spark_timeuuid.append(spark_time[i][0])
    time_data = spark.createDataFrame(zip(spark_timeuuid,normal_timestamp),['create_time','ts'])
    result = df.join(time_data,df.create_time==time_data.create_time,'inner').drop(df.create_time)
    result = result.select('create_time','ts','bid','campaign_id','custom_track','group_id','job_id','publisher_id')
    return result

def calculating_clicks(processed_df):
    clicks_data = processed_df.filter(processed_df.custom_track == 'clicks')
    clicks_data = clicks_data.na.fill({'bid': 0})
    clicks_data = clicks_data.na.fill({'publisher_id': 0})
    clicks_data = clicks_data.na.fill({'job_id': 0})
    clicks_data = clicks_data.na.fill({'group_id': 0})
    clicks_data = clicks_data.na.fill({'campaign_id': 0})
    clicks_data.createOrReplaceTempView('clicks_data')
    clicks_output = spark.sql(""" with cte1 as (select create_time,bid,job_id,publisher_id,date(ts) as dates,hour(ts) as hours from clicks_data) 
    select job_id,publisher_id,dates,hours,sum(bid) as spend_hour, round(avg(bid),2) as bid_set,count(create_time) as clicks 
    from cte1 
    group by publisher_id,job_id,dates,hours""")
    return clicks_output 

def calculating_conversion(processed_df):
    conversion_data = processed_df.filter(processed_df.custom_track == 'conversion')
    conversion_data = conversion_data.na.fill({'bid': 0})
    conversion_data = conversion_data.na.fill({'publisher_id': 0})
    conversion_data = conversion_data.na.fill({'job_id': 0})
    conversion_data = conversion_data.na.fill({'group_id': 0})
    conversion_data = conversion_data.na.fill({'campaign_id': 0})
    conversion_data.createOrReplaceTempView('conversion_data')
    conversion_output = spark.sql(""" with cte1 as (select create_time,bid,job_id,publisher_id,date(ts) as dates,hour(ts) as hours from conversion_data) 
    select job_id,publisher_id,dates,hours,sum(bid) as spend_hour, round(avg(bid),2) as bid_set,count(create_time) as conversion 
    from cte1 
    group by publisher_id,job_id,dates,hours""")
    return conversion_output 

def calculating_qualified(processed_df):
    qualified_data = processed_df.filter(processed_df.custom_track == 'qualified')
    qualified_data = qualified_data.na.fill({'bid': 0})
    qualified_data = qualified_data.na.fill({'publisher_id': 0})
    qualified_data = qualified_data.na.fill({'job_id': 0})
    qualified_data = qualified_data.na.fill({'group_id': 0})
    qualified_data = qualified_data.na.fill({'campaign_id': 0})
    qualified_data.createOrReplaceTempView('qualified_data')
    qualified_output = spark.sql(""" with cte1 as (select create_time,bid,job_id,publisher_id,date(ts) as dates,hour(ts) as hours from qualified_data) 
    select job_id,publisher_id,dates,hours,sum(bid) as spend_hour, round(avg(bid),2) as bid_set,count(create_time) as qualified_application 
    from cte1 
    group by publisher_id,job_id,dates,hours""")
    return qualified_output 

def calculating_unqualified(processed_df):
    unqualified_data = processed_df.filter(processed_df.custom_track == 'qualified')
    unqualified_data = unqualified_data.na.fill({'bid': 0})
    unqualified_data = unqualified_data.na.fill({'publisher_id': 0})
    unqualified_data = unqualified_data.na.fill({'job_id': 0})
    unqualified_data = unqualified_data.na.fill({'group_id': 0})
    unqualified_data = unqualified_data.na.fill({'campaign_id': 0})
    unqualified_data.createOrReplaceTempView('unqualified_data')
    unqualified_output = spark.sql(""" with cte1 as (select create_time,bid,job_id,publisher_id,date(ts) as dates,hour(ts) as hours from unqualified_data) 
    select job_id,publisher_id,dates,hours,sum(bid) as spend_hour, round(avg(bid),2) as bid_set,count(create_time) as disqualified_application 
    from cte1 
    group by publisher_id,job_id,dates,hours""")
    return unqualified_output 

def processing_cassandra_output(processed_df):
    clicks_output = calculating_clicks(processed_df)
    conversion_output = calculating_conversion(processed_df)
    qualified_output = calculating_qualified(processed_df)
    unqualified_output = calculating_unqualified(processed_df)
    cassandra_output = clicks_output.join(conversion_output,['job_id', 'publisher_id', 'dates', 'hours'],'full').join(qualified_output,['job_id', 'publisher_id', 'dates', 'hours'],'full').join(unqualified_output,['job_id', 'publisher_id', 'dates', 'hours'],'full').drop(conversion_output.job_id).drop(conversion_output.publisher_id)\
    .drop(conversion_output.dates).drop(conversion_output.hours).drop(conversion_output.spend_hour).drop(conversion_output.bid_set).drop(qualified_output.bid_set).drop(qualified_output.job_id).drop(qualified_output.publisher_id).drop(qualified_output.dates).drop(qualified_output.hours).drop(qualified_output.spend_hour).drop(qualified_output.bid_set).drop(unqualified_output.job_id).drop(unqualified_output.publisher_id).drop(unqualified_output.dates).drop(unqualified_output.hours).drop(unqualified_output.spend_hour).drop(unqualified_output.bid_set)
    return cassandra_output

def mapping_mysql_fields(cassandra_output,url,driver,user,password):
    sql = """(select id as job_id,campaign_id , group_id , company_id from job) test"""
    company_id = spark.read.format('jdbc').options(url=url, driver=driver, dbtable=sql, user=user, password=password).load()
    output = cassandra_output.join(company_id,cassandra_output.job_id == company_id.job_id,'left').drop(company_id.job_id)
    return output

def import_to_mysql(url,driver,user,password,output):
    output.write.format('jdbc').option('url', url).option('driver', driver).option('dbtable', 'events_etl').option('user', user).option('password', password).mode('append').save()
    return print("Data Import Successfully")

In [12]:
def main_task_real_time(time_mysql,time_cassandra,now):
#Read data from Cassandra 
    df = spark.read.format("org.apache.spark.sql.cassandra").options(table="tracking",keyspace="keyspace_de").load()
    df.show()
#mysql_df = spark.read.format('jdbc').options(url=url, driver=driver, dbtable=sql, user=user, password=password).load()
#mysql_df.show()
    print("--------------------")
    print("Reading data from Cassandra")
    print("--------------------")
    df = df.select('create_time','bid','campaign_id','custom_track','group_id','job_id','publisher_id')
    df.show(10)
    print("--------------------")
    print("--------------------")
    print("Process time data from Cassandra ")
    print("--------------------")
    processed_df = process_timeuuid(df)
    print("--------------------")
    print("Finalizing data from Cassandra ")
    print("--------------------")
    cassandra_output = processing_cassandra_output(processed_df)
    cassandra_output.show(10)
    print("--------------------")
    print("Mapping MySQL fields into Cassandra")
    print("--------------------")
    output = mapping_mysql_fields(cassandra_output,url,driver,user,password)
    output.show(10)
    print("--------------------")
    print("Importing Data to Data Warehouse")
    print("--------------------")
    import_to_mysql(url,driver,user,password,output)
    print("Job Finished")

In [13]:
#Read data from MySQL 
host = 'localhost'
port = '3306'
db_name = 'Data_Warehouse'
user = 'root'
password = '1'
url = 'jdbc:mysql://' + host + ':' + port + '/' + db_name
driver = "com.mysql.cj.jdbc.Driver"
sql = """(select * from events) test"""

In [14]:
def etl_task_realtime(is_loop):
    while is_loop == True:
        time_mysql = spark.read.format('jdbc').options(url=url, driver=driver, dbtable="""(select max(latest_modified_time) from events_etl) time_max_test""", user=user, password=password).load()
        if time_mysql.take(1) == None:
            time_mysql = '1997-10-06 00:00:00'
        else:
            time_mysql = str(time_mysql.take(1)[0][0] + datetime.timedelta(hours=7))
        now = str(datetime.datetime.now())
        time_cassandra = spark.read.format("org.apache.spark.sql.cassandra").options(table="tracking",keyspace="keyspace_de").load().where((col('ts') >= time_mysql) & (col('ts') <= now))
        time_cassandra = time_cassandra.select('ts')
        time_cassandra = time_cassandra.agg({'ts':'max'}).take(1)[0][0]
        if time_cassandra == None:
            time_cassandra = '1997-10-06 00:00:00'
        if time_cassandra > time_mysql:
            main_task_real_time(time_mysql,time_cassandra,now)
        else:
            print("No new data found")
        print("Finish loop")

In [16]:
etl_task_realtime(is_loop=True)

+--------------------+----+----------+-----------+---+------------+-----+--------------------+---------------+--------------------+---+--------+----+------+----+------------+--------------------+---------+--------------------+----+--------------------+-------------------+------------+-----------+----------+----------+--------+---+--------+
|         create_time| bid|        bn|campaign_id| cd|custom_track|   de|                  dl|             dt|                  ed| ev|group_id|  id|job_id|  md|publisher_id|                  rl|       sr|                  ts|  tz|                  ua|                uid|utm_campaign|utm_content|utm_medium|utm_source|utm_term|  v|      vp|
+--------------------+----+----------+-----------+---+------------+-----+--------------------+---------------+--------------------+---+--------+----+------+----+------------+--------------------+---------+--------------------+----+--------------------+-------------------+------------+-----------+----------+--------

--------------------
Finalizing data from Cassandra 
--------------------
+------+------------+----------+-----+----------+-------+------+----------+---------------------+------------------------+
|job_id|publisher_id|     dates|hours|spend_hour|bid_set|clicks|conversion|qualified_application|disqualified_application|
+------+------------+----------+-----+----------+-------+------+----------+---------------------+------------------------+
|     0|           0|2022-07-06|   15|      null|   null|  null|         2|                 null|                    null|
|     0|           0|2022-07-08|   13|      null|   null|  null|         4|                    2|                       2|
|     0|           0|2022-07-09|   15|      null|   null|  null|         3|                 null|                    null|
|     0|           0|2022-07-10|    8|      null|   null|  null|         3|                 null|                    null|
|     0|           0|2022-07-11|    0|      null|   null|  null| 

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "D:\Application\spark\spark3.2\python\lib\py4j-0.10.9.5-src.zip\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "D:\Application\spark\spark3.2\python\lib\py4j-0.10.9.5-src.zip\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "D:\Application\Anaconda3\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 