In [1]:
import findspark
findspark.init()
import pyspark.sql.functions as sf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContext
from uuid import * 
from uuid import UUID
import time_uuid 
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.window import Window as W
import datetime
import time

In [2]:
spark = SparkSession.builder.config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.1.0").getOrCreate()

In [3]:
def time_process(df):
    spark_time = df.select('create_time').collect()
    normal_time = []
    for i in range(len(spark_time)):
        a = time_uuid.TimeUUID(bytes=UUID(spark_time[i][0]).bytes).get_datetime().strftime(('%Y-%m-%d %H:%M:%S'))
        normal_time.append(a)
    spark_timeuuid = []
    for i in range(len(spark_time)):
        spark_timeuuid.append(spark_time[i][0])
    time_data = spark.createDataFrame(zip(spark_timeuuid,normal_time),['create_time','ts'])
    result = df.join(time_data,['create_time'],'inner').drop(df.ts)
    result = result.select('create_time','ts','job_id','custom_track','bid','campaign_id','group_id','publisher_id')
    return result

In [4]:
def calculating_clicks(df):
    click_df = df.filter(df.custom_track == 'click')
    click_df = click_df.na.fill({'bid': 0})
    click_df = click_df.na.fill({'job_id': 0})
    click_df = click_df.na.fill({'publisher_id': 0})
    click_df = click_df.na.fill({'group_id': 0})
    click_df = click_df.na.fill({'campaign_id': 0})
    click_df.registerTempTable('clicks')
    click_output = spark.sql(""" select job_id, date(ts) as date, hour(ts) as hour, publisher_id, campaign_id, group_id, avg(bid) as bid_set, count(*) as clicks, sum(bid) as spend_hour 
    from clicks
    group by job_id , date(ts) , hour(ts) , publisher_id , campaign_id , group_id""")
    return click_output

In [5]:
def calculating_conversion(df):
    conversion_data = df.filter(df.custom_track == 'conversion')
    conversion_data = conversion_data.na.fill({'job_id':0})
    conversion_data = conversion_data.na.fill({'publisher_id':0})
    conversion_data = conversion_data.na.fill({'group_id':0})
    conversion_data = conversion_data.na.fill({'campaign_id':0})
    conversion_data.registerTempTable('conversion')
    conversion_output = spark.sql("""select job_id , date(ts) as date , hour(ts) as hour , publisher_id , campaign_id , group_id , count(*) as conversion  from conversion
    group by job_id , date(ts) , hour(ts) , publisher_id , campaign_id , group_id """)
    return conversion_output 

In [6]:
def calculating_qualified(df):    
    qualified_data = df.filter(df.custom_track == 'qualified')
    qualified_data = qualified_data.na.fill({'job_id':0})
    qualified_data = qualified_data.na.fill({'publisher_id':0})
    qualified_data = qualified_data.na.fill({'group_id':0})
    qualified_data = qualified_data.na.fill({'campaign_id':0})
    qualified_data.registerTempTable('qualified')
    qualified_output = spark.sql("""select job_id , date(ts) as date , hour(ts) as hour , publisher_id , campaign_id , group_id , count(*) as qualified  from qualified
    group by job_id , date(ts) , hour(ts) , publisher_id , campaign_id , group_id """)
    return qualified_output

In [7]:
def calculating_unqualified(df):
    unqualified_data = df.filter(df.custom_track == 'unqualified')
    unqualified_data = unqualified_data.na.fill({'job_id':0})
    unqualified_data = unqualified_data.na.fill({'publisher_id':0})
    unqualified_data = unqualified_data.na.fill({'group_id':0})
    unqualified_data = unqualified_data.na.fill({'campaign_id':0})
    unqualified_data.registerTempTable('unqualified')
    unqualified_output = spark.sql("""select job_id , date(ts) as date , hour(ts) as hour , publisher_id , campaign_id , group_id , count(*) as unqualified  from unqualified
    group by job_id , date(ts) , hour(ts) , publisher_id , campaign_id , group_id """)
    return unqualified_output

In [8]:
def process_final_data(clicks_output, conversion_output, qualified_output, unqualified_output):
    final_data = clicks_output.join(conversion_output, ['job_id', 'date', 'hour', 'publisher_id', 'campaign_id', 'group_id'], 'full')\
    .join(qualified_output, ['job_id', 'date', 'hour', 'publisher_id', 'campaign_id', 'group_id'], 'full')\
    .join(unqualified_output, ['job_id', 'date', 'hour', 'publisher_id', 'campaign_id', 'group_id'], 'full')
    return final_data

In [9]:
def process_cassandra_data(df):
    clicks = calculating_clicks(df)
    conversion = calculating_conversion(df)
    qualified = calculating_qualified(df)
    unqualified = calculating_unqualified(df)
    return process_final_data(clicks, conversion, qualified, unqualified)

In [10]:
def get_data_sql(url,driver,user,password):
    sql = """(select id as job_id, company_id, group_id, campaign_id FROM job) test"""
    company = spark.read.format('jdbc').options(url= url, driver = driver, user = user, password= password, dbtable = sql).load()
    return company

In [11]:
def import_to_mysql(output):
    final = output.select('job_id','date','hour','publisher_id','company_id','campaign_id','group_id','unqualified','qualified','conversion','clicks','bid_set','spend_hour', 'last_updated_time')
    final = final.withColumnRenamed('date', 'dates').withColumnRenamed('hour','hours').withColumnRenamed('qualified','qualified_application').\
    withColumnRenamed('unqualified','disqualified_application').withColumnRenamed('conversions','conversion')
    final = final.withColumn('sources', sf.lit('Cassandra'))
    
    final.show()
    final.write.format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/Data_Warehouse") \
    .option("dbtable", "events") \
    .mode("append") \
    .option("user", "root") \
    .option("password", "mysql") \
    .save()
    return print('Data imported successfully')
    

In [12]:
def main_task(user, password, url, driver, mysql_time):
    print("-----------------------------")
    print("Retrieve data from Cassandra")
    print("-----------------------------")

    df = spark.read.format('org.apache.spark.sql.cassandra').options(table ='tracking', keyspace = 'data_engineering').load().where(sf.col('ts') >= mysql_time)
    df.show()

    df = time_process(df)
    print("-----------------------------")
    print("Process Cassandra statistics")
    print("-----------------------------")
    data_lake = process_cassandra_data(df)
    
    print("-----------------------------")
    print("Process MYSQL statistics")
    print("-----------------------------")
    data_wh = get_data_sql(url= url, driver = driver, user = user, password=password)
    
    print("-----------------------------")
    print("Finalizing Output")
    print("-----------------------------")
    etl_data = data_lake.join(data_wh, 'job_id', 'left').drop(data_wh.campaign_id).drop(data_wh.group_id)
    etl_data = etl_data.withColumn('last_updated_time', sf.lit(datetime.datetime.now()))
    import_to_mysql(etl_data)
    print('Task finished') 

In [13]:
def get_latest_cassandra_time():
    df = spark.read.format("org.apache.spark.sql.cassandra").options(table="tracking",keyspace="data_engineering").load()
    cassandra_time = df.agg({'ts':'max'}).take(1)[0][0]
    # sql = "(select max(ts) from events limit 1) test "
    # df = spark.read.format("org.apache.spark.sql.cassandra").options(dbtable = sql,table="tracking",keyspace="data_engineering").load().take(1)[0][0]
    return cassandra_time


In [14]:
def get_latest_mysql_time(user, password, url, driver):
    sql = """(select max(last_updated_time) from events) test"""
    mysql_time = spark.read.format('jdbc').options(url=url, driver=driver, dbtable=sql, user=user, password=password).load().take(1)[0][0]
    if mysql_time is None:
        mysql_latest = '1998-01-01 23:59:59'
    else :
        mysql_latest = mysql_time.strftime('%Y-%m-%d %H:%M:%S')
    return mysql_latest
    # print(mysql_time)

In [15]:
def run():
    while True:
        host = 'localhost'
        port = '3306'
        db_name = 'Data_Warehouse'
        driver = 'com.mysql.cj.jdbc.Driver'
        user = 'root'
        password = 'mysql'
        url = 'jdbc:mysql://' + host + ':' + port + '/' + db_name
        timestart = datetime.datetime.now()
        cassandra_time = get_latest_cassandra_time()
        mysql_time = get_latest_mysql_time(user, password, url, driver)
        #compare time
        if cassandra_time > mysql_time:
            main_task(user, password, url,driver, mysql_time)
        else:
            print("Up to dated")
        timeend = datetime.datetime.now()
        print(f"Need {timeend - timestart} to finish")
        time.sleep(30)

In [16]:
run()

-----------------------------
Retrieve data from Cassandra
-----------------------------
+--------------------+----+----------+-----------+---+------------+-----+--------------------+---------------+--------------------+---+--------+----+------+----+------------+--------------------+---------+--------------------+----+--------------------+-------------------+------------+-----------+----------+----------+--------+---+--------+
|         create_time| bid|        bn|campaign_id| cd|custom_track|   de|                  dl|             dt|                  ed| ev|group_id|  id|job_id|  md|publisher_id|                  rl|       sr|                  ts|  tz|                  ua|                uid|utm_campaign|utm_content|utm_medium|utm_source|utm_term|  v|      vp|
+--------------------+----+----------+-----------+---+------------+-----+--------------------+---------------+--------------------+---+--------+----+------+----+------------+--------------------+---------+--------------------+-



-----------------------------
Process MYSQL statistics
-----------------------------
-----------------------------
Finalizing Output
-----------------------------
+------+----------+-----+------------+----------+-----------+--------+------------------------+---------------------+----------+------+-------+----------+--------------------+---------+
|job_id|     dates|hours|publisher_id|company_id|campaign_id|group_id|disqualified_application|qualified_application|conversion|clicks|bid_set|spend_hour|   last_updated_time|  sources|
+------+----------+-----+------------+----------+-----------+--------+------------------------+---------------------+----------+------+-------+----------+--------------------+---------+
|     0|2022-07-06|    9|           0|      null|          0|       0|                    null|                 null|      null|     1|    0.0|         0|2023-08-28 16:50:...|Cassandra|
|     0|2022-07-06|   15|           0|      null|          0|       0|                    nul

AttributeError: 'str' object has no attribute 'strftime'