In [2]:
from pyspark.sql import SparkSession
from cassandra.util import datetime_from_uuid1
from cassandra.cqltypes import TimeUUIDType
import uuid 
import time
from uuid import UUID
import time_uuid 
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, col
import pyspark.sql.functions as sf
import datetime

In [3]:
spark = SparkSession.builder \
    .appName("Hien's project") \
    .config("spark.jars.packages", "mysql:mysql-connector-java:8.0.32,com.datastax.spark:spark-cassandra-connector_2.12:3.1.0") \
    .config("spark.cassandra.connection.host", "localhost") \
    .config("spark.cassandra.connection.port", "9042") \
    .getOrCreate()
# 'localhost' if Spark is outside Docker, 'cassandra' if Spark is inside Docker

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/hien2706/.ivy2/cache
The jars for the packages stored in: /home/hien2706/.ivy2/jars
mysql#mysql-connector-java added as a dependency
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f053457f-26f9-4cf6-8bbf-058b5fb01a67;1.0
	confs: [default]
	found mysql#mysql-connector-java;8.0.32 in central
	found com.mysql#mysql-connector-j;8.0.32 in central
	found com.google.protobuf#protobuf-java;3.21.9 in central
	found com.datastax.spark#spark-cassandra-connector_2.12;3.1.0 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.1.0 in central
	found com.datastax.oss#java-driver-core-shaded;4.12.0 in central
	found com.datastax.oss#native-protocol;1.5.0 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metri

In [39]:
def process_click_data(df):
    click_data = df.filter(df.custom_track == 'click')
    click_data.createOrReplaceTempView('click_data')
    return spark.sql("""select date(ts) as date,
                        hour(ts) as hour ,
                        job_id,
                        publisher_id,
                        campaign_id,
                        group_id,
                        round(avg(bid),2) as bid_set , 
                        sum(bid) as spend_hour , 
                        count(*) as click 
                        from click_data 
                        group by date(ts), hour(ts), job_id, publisher_id, campaign_id, group_id""")
    
    
def process_conversion_data(df):
    conversion_data = df.filter(df.custom_track == 'conversion')
    conversion_data.createOrReplaceTempView('conversion_data')
    return spark.sql("""select date(ts) as date,
                                hour(ts) as hour ,
                                job_id,
                                publisher_id,
                                campaign_id,
                                group_id, 
                                count(*) as conversion 
                                from conversion_data 
                                group by date(ts), hour(ts),job_id,publisher_id,campaign_id,group_id""")
    

def process_qualified_data(df):
    qualified_data = df.filter(df.custom_track == 'qualified')
    qualified_data.createOrReplaceTempView('qualified_data')

    return spark.sql("""select date(ts) as date,
                               hour(ts) as hour,
                               job_id,
                               publisher_id,
                               campaign_id,
                               group_id, 
                               count(*) as qualified 
                               from qualified_data 
                               group by date(ts),hour(ts),job_id,publisher_id,campaign_id,group_id""")
    
    
def process_unqualified_data(df):
    unqualified_data = df.filter(df.custom_track == 'unqualified')
    unqualified_data.createOrReplaceTempView('unqualified_data')
    return spark.sql("""select date(ts) as date,
                                 hour(ts) as hour ,
                                 job_id,
                                 publisher_id,
                                 campaign_id,
                                 group_id, 
                                 count(*) as unqualified 
                                 from unqualified_data 
                                 group by date(ts), hour(ts),job_id,publisher_id,campaign_id,group_id""")
    
    
def process_cassandra_data(df):
    click_data = process_click_data(df)
    conversion_data = process_conversion_data(df)
    qualified_data = process_qualified_data(df)
    unqualified_data = process_unqualified_data(df)
    
    assert click_data is not None, "click_data is None"
    assert conversion_data is not None, "conversion_data is None"
    assert qualified_data is not None, "qualified_data is None"
    assert unqualified_data is not None, "unqualified_data is None"

    final = click_data.join(conversion_data, on=['date', 'hour', 'job_id', 'publisher_id', 'campaign_id', 'group_id'], how='full') \
        .join(qualified_data, on=['date', 'hour', 'job_id', 'publisher_id', 'campaign_id', 'group_id'], how='full') \
        .join(unqualified_data, on=['date', 'hour', 'job_id', 'publisher_id', 'campaign_id', 'group_id'], how='full')
    return final
    
    
    

In [38]:
def finalize_data(df,cassandra_latest_time):
    df = df.withColumn('updated_at',sf.lit(cassandra_latest_time))
    df = df.withColumn('sources',sf.lit('Cassandra'))
    df = df.withColumnRenamed('date','dates')
    df = df.withColumnRenamed('hour','hours')
    df = df.withColumnRenamed('qualified','qualified_application')
    df = df.withColumnRenamed('unqualified','disqualified_application')
    df = df.withColumnRenamed('unqualified','disqualified_application')
    df = df.withColumnRenamed('click','clicks')
    
    return df.select(
    'job_id',
    'dates',
    'hours',
    'disqualified_application',
    'qualified_application',
    'conversion',
    'company_id',
    'group_id',
    'campaign_id',
    'publisher_id',
    'bid_set',
    'clicks',
    'spend_hour',
    'sources',
    'updated_at')

In [72]:
def filter_data(df,mysql_latest_time):
    df = df.select('ts','job_id','custom_track','bid','campaign_id','group_id','publisher_id')
    df = df.filter(df.job_id.isNotNull())
    df = df.filter(df.ts.isNotNull())
    df = df.withColumn("ts", sf.split(col("ts"), "\\.").getItem(0))
    print("After Split:")
    df.show(truncate=False)
    df = df.withColumn("ts", sf.to_timestamp(col("ts"), "yyyy-MM-dd HH:mm:ss"))
    df = df.withColumn("bid", col("bid").cast("double"))
    print(df.show(truncate=False))
    df = df.where(col('ts')> mysql_latest_time)
    return df

In [37]:
def get_data_from_mysql(host = 'localhost' ,port = '3306', db_name = 'mydatabase',
                        driver = "com.mysql.cj.jdbc.Driver", user = 'myuser', password = 'mypassword',
                        sql_query = '(SELECT id as job_id, company_id FROM job) A'):
    
    return spark.read.format('jdbc').options(url = f'jdbc:mysql://{host}:{port}/{db_name}' , 
                                             driver = driver , 
                                             dbtable = sql_query , 
                                             user=user , 
                                             password = password).load()
    

def write_data_to_mysql(df,host = 'localhost',port = '3306',
                        db_name = 'mydatabase', user = 'myuser', table = 'events',
                        password = 'mypassword', driver = "com.mysql.cj.jdbc.Driver"):
    df.write.format("jdbc") \
    .option("driver",driver) \
    .option("url", f"jdbc:mysql://{host}:{port}/{db_name}") \
    .option("dbtable", table) \
    .mode("append") \
    .option("user", user) \
    .option("password", password) \
    .save()

In [7]:
#MySQL Credentials
host = 'localhost'
port = '3306'
db_name = 'mydatabase'
user = 'myuser'
password = 'mypassword'
driver = "com.mysql.cj.jdbc.Driver"

In [74]:
def main_task(cassandra_latest_time,mysql_latest_time):

    print("Get data from cassandra")
    df = spark.read.format("org.apache.spark.sql.cassandra").options(table = 'tracking',keyspace = 'my_keyspace').load()

    print('Filter out the data')
    df = filter_data(df,mysql_latest_time)
   
    print('Process the data')
    df = df.cache()
    df = process_cassandra_data(df)
    
    print('get company data from mysql')
    jobs = get_data_from_mysql(host = host,port = port,db_name = db_name,user = user,password = password, driver = driver,
                               sql_query = '(SELECT id as job_id, company_id FROM job) A')
    
    print('Merge the data with company data')
    df = df.join(jobs,on = 'job_id',how='left')
    
    print('finalizing the data')
    df = finalize_data(df,cassandra_latest_time)
    print('final ouput')
    print(df.show(truncate=False))
    
    print('Write the data to mysql')
    write_data_to_mysql(df,host,port,db_name,user,password,driver)

In [75]:
def get_latest_time_cassandra():
    data = spark.read.format("org.apache.spark.sql.cassandra").options(table = 'tracking',keyspace = 'my_keyspace').load()
    cassandra_latest_time = data.agg({'ts':'max'}).take(1)[0][0]
    return cassandra_latest_time

def get_mysql_latest_time(host,port,db_name,driver,user,password):    
    sql_query = """(select max(updated_at) from events) data"""
    mysql_time = get_data_from_mysql(host,port,db_name,driver,user,password,sql_query)
    mysql_time = mysql_time.take(1)[0][0]
    if mysql_time is None:
        mysql_latest_time = '1970-01-01 23:59:59'
    else:
        mysql_latest_time = mysql_time.strftime('%Y-%m-%d %H:%M:%S')
    return mysql_latest_time 
while True:
    start_time = datetime.datetime.now()
    cassandra_latest_time = get_latest_time_cassandra()
    mysql_latest_time = get_mysql_latest_time(host,port,db_name,driver,user,password)

    mysql_latest_time = datetime.datetime.strptime(mysql_latest_time, '%Y-%m-%d %H:%M:%S')
    if '.' in cassandra_latest_time:
        cassandra_latest_time = cassandra_latest_time.split('.')[0]
    cassandra_latest_time = datetime.datetime.strptime(cassandra_latest_time, '%Y-%m-%d %H:%M:%S')
           
    print(f'{type(cassandra_latest_time)} :  {cassandra_latest_time}')
    print(f'{type(mysql_latest_time)} :  {mysql_latest_time}')
    
    if cassandra_latest_time > mysql_latest_time:
        print(f"bruh cassandra_latest_time: {cassandra_latest_time} > mysql_latest_time: {mysql_latest_time}")
        main_task(cassandra_latest_time,mysql_latest_time)
    else:
        print("No new data found in cassandra")
    
    end_time = datetime.datetime.now()
    execution_time = (end_time - start_time).total_seconds()
    print('Job takes {} seconds to execute'.format(execution_time))
    time.sleep(60)

24/09/14 17:45:37 WARN V2ScanPartitioningAndOrdering: Spark ignores the partitioning CassandraPartitioning. Please use KeyGroupedPartitioning for better performance
24/09/14 17:45:37 WARN V2ScanPartitioningAndOrdering: Spark ignores the partitioning CassandraPartitioning. Please use KeyGroupedPartitioning for better performance


<class 'datetime.datetime'> :  2024-09-14 17:10:48
<class 'datetime.datetime'> :  1970-01-01 23:59:59
bruh cassandra_latest_time: 2024-09-14 17:10:48 > mysql_latest_time: 1970-01-01 23:59:59
Get data from cassandra
+------------------------------------+---+----+-----------+----+------------+----+----+----+----+----+--------+----+------+----+------------+----+----+--------------------------+----+----+----+------------+-----------+----------+----------+--------+----+----+
|create_time                         |bid|bn  |campaign_id|cd  |custom_track|de  |dl  |dt  |ed  |ev  |group_id|id  |job_id|md  |publisher_id|rl  |sr  |ts                        |tz  |ua  |uid |utm_campaign|utm_content|utm_medium|utm_source|utm_term|v   |vp  |
+------------------------------------+---+----+-----------+----+------------+----+----+----+----+----+--------+----+------+----+------------+----+----+--------------------------+----+----+----+------------+-----------+----------+----------+--------+----+----+
|0da6

24/09/14 17:45:37 WARN V2ScanPartitioningAndOrdering: Spark ignores the partitioning CassandraPartitioning. Please use KeyGroupedPartitioning for better performance
24/09/14 17:45:38 WARN V2ScanPartitioningAndOrdering: Spark ignores the partitioning CassandraPartitioning. Please use KeyGroupedPartitioning for better performance
24/09/14 17:45:38 WARN V2ScanPartitioningAndOrdering: Spark ignores the partitioning CassandraPartitioning. Please use KeyGroupedPartitioning for better performance


+-------------------+------+------------+---+-----------+--------+------------+
|ts                 |job_id|custom_track|bid|campaign_id|group_id|publisher_id|
+-------------------+------+------------+---+-----------+--------+------------+
|2024-09-14 17:09:08|28    |qualified   |0.0|47         |33      |11          |
|2024-09-14 17:08:27|108   |conversion  |0.0|39         |43      |4           |
|2024-09-14 17:10:28|81    |qualified   |0.0|6          |27      |6           |
|2024-09-14 17:08:07|135   |click       |1.0|17         |30      |49          |
|2024-09-14 17:10:48|22    |unqualified |1.0|39         |6       |42          |
|2024-09-14 17:08:47|175   |unqualified |1.0|48         |14      |8           |
|2024-09-14 17:10:08|154   |unqualified |1.0|45         |27      |8           |
|2024-09-14 17:09:07|9     |unqualified |1.0|6          |5       |33          |
|2024-09-14 17:09:08|122   |conversion  |0.0|11         |48      |36          |
|2024-09-14 17:08:47|54    |qualified   

24/09/14 17:45:38 WARN V2ScanPartitioningAndOrdering: Spark ignores the partitioning CassandraPartitioning. Please use KeyGroupedPartitioning for better performance


get company data from mysql
Merge the data with company data
finalizing the data
final ouput
+------+----------+-----+------------------------+---------------------+----------+----------+--------+-----------+------------+-------+------+----------+---------+-------------------+
|job_id|     dates|hours|disqualified_application|qualified_application|conversion|company_id|group_id|campaign_id|publisher_id|bid_set|clicks|spend_hour|  sources|         updated_at|
+------+----------+-----+------------------------+---------------------+----------+----------+--------+-----------+------------+-------+------+----------+---------+-------------------+
|   103|2024-09-14|   17|                    NULL|                    1|      NULL|         1|      47|          2|          11|   NULL|  NULL|      NULL|Cassandra|2024-09-14 17:10:48|
|   106|2024-09-14|   17|                       1|                 NULL|      NULL|         1|      22|         37|          47|   NULL|  NULL|      NULL|Cassandra|202

KeyboardInterrupt: 

In [69]:
test_df = spark.createDataFrame([("2022-07-26 03:07:17.315",)], ["ts"])
test_df = test_df.withColumn("ts", sf.split(col("ts"), "\\.").getItem(0))
test_df.show(truncate=False)

+-------------------+
|ts                 |
+-------------------+
|2022-07-26 03:07:17|
+-------------------+

