In [1]:
import findspark
findspark.init()
import os
import time
import datetime
import pyspark.sql.functions as sf
from uuid import *
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import when
from pyspark.sql.functions import col
from pyspark.sql.types import *
from pyspark.sql.functions import lit
from pyspark import SparkConf, SparkContext
from uuid import * 
from uuid import UUID
import time_uuid 
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.window import Window as W

In [2]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
spark = SparkSession.builder.config("spark.jars.packages",'com.datastax.spark:spark-cassandra-connector_2.12:3.1.0').getOrCreate()

In [4]:
df = spark.read.format("org.apache.spark.sql.cassandra").options(table='tracking',keyspace='mydata').load()

In [5]:
df.show(5)
print(df.count())

+--------------------+----+----------+-----------+---+------------+-----+--------------------+---------------+--------------------+---+--------+----+------+----+------------+--------------------+---------+--------------------+----+--------------------+-------------------+------------+-----------+----------+----------+--------+---+--------+
|         create_time| bid|        bn|campaign_id| cd|custom_track|   de|                  dl|             dt|                  ed| ev|group_id|  id|job_id|  md|publisher_id|                  rl|       sr|                  ts|  tz|                  ua|                uid|utm_campaign|utm_content|utm_medium|utm_source|utm_term|  v|      vp|
+--------------------+----+----------+-----------+---+------------+-----+--------------------+---------------+--------------------+---+--------+----+------+----+------------+--------------------+---------+--------------------+----+--------------------+-------------------+------------+-----------+----------+--------

In [6]:
host ='localhost'
port='3306'
db_name='mydata'
user='root'
password='1'
url='jdbc:mysql://' + host +':' + port +'/' + db_name
driver="com.mysql.cj.jdbc.Driver"
sql="""(SELECT id as job_id, company_id FROM job) test """
mysql_data = spark.read.format('jdbc').options(url=url, driver=driver, dbtable=sql, user=user, password=password).load()

In [7]:
def process_timeuuid(df):
    spark_time = df.select('create_time').collect()
    normal_time = []
    for i in range(len(spark_time)):
        a = time_uuid.TimeUUID(bytes = UUID(spark_time[i][0]).bytes).get_datetime().strftime('%Y-%m-%d %H:%M:%S')
        normal_time.append(a)
    spark_timeuuid = []
    for i in range(len(spark_time)):
        spark_timeuuid.append(spark_time[i][0])
    time_data = spark.createDataFrame(zip(spark_timeuuid,normal_time),['create_time','ts'])
    result = df.join(time_data,['create_time'],'inner').drop(df.ts)
    result = result.select('create_time','ts','bid','job_id','campaign_id','custom_track','group_id','publisher_id')
    return result

In [8]:
process_timeuuid(df).show()

+--------------------+-------------------+----+------+-----------+------------+--------+------------+
|         create_time|                 ts| bid|job_id|campaign_id|custom_track|group_id|publisher_id|
+--------------------+-------------------+----+------+-----------+------------+--------+------------+
|ebbecd00-018d-11e...|2022-07-12 02:54:17|null|  null|       null|       click|    null|        null|
|5047c9c0-00c6-11e...|2022-07-11 03:05:26|null|  null|       null|        null|    null|        null|
|a54b7240-0d60-11e...|2022-07-27 04:00:25|null|  null|       null|       alive|    null|        null|
|c87ad3f0-00ef-11e...|2022-07-11 08:02:17|null|  null|       null|        null|    null|        null|
|be352470-0cae-11e...|2022-07-26 06:46:57|   1|   258|         93|       click|    null|           1|
|cd00c920-0c93-11e...|2022-07-26 03:34:05|null|  null|       null|       alive|    null|        null|
|da5200d0-0c8e-11e...|2022-07-26 02:58:40|   0|  1531|        222|        null|   

In [9]:
process_df=process_timeuuid(df)

In [10]:
def calculating_clicks(process_df):
    clicks_data = process_df.filter(process_df.custom_track == 'click')
    clicks_data = clicks_data.na.fill({'bid':0})
    clicks_data = clicks_data.na.fill({'job_id':0})
    clicks_data = clicks_data.na.fill({'campaign_id':0})
    clicks_data = clicks_data.na.fill({'group_id':0})
    clicks_data = clicks_data.na.fill({'publisher_id':0})
    clicks_data.registerTempTable('clicks')
    clicks_output = spark.sql("""with cte1 as (select create_time , ts , date(ts) as Date , hour(ts) as hour,
    bid,job_id,campaign_id,group_id,publisher_id from clicks)
    select job_id,publisher_id,date,hour,campaign_id,group_id ,
    sum(bid) as spend_hour , count(create_time) as clicks , avg(bid) as bid_set 
    from cte1
    group by job_id,publisher_id,date,hour,campaign_id,group_id""")
    return clicks_output 


In [11]:
def calculating_conversion(process_df):
    conversion_data = process_df.filter(process_df.custom_track == 'conversion')
    conversion_data = conversion_data.na.fill({'job_id':0})
    conversion_data = conversion_data.na.fill({'campaign_id':0})
    conversion_data = conversion_data.na.fill({'group_id':0})
    conversion_data = conversion_data.na.fill({'publisher_id':0})
    conversion_data.registerTempTable('conversion_data')
    conversion_data = spark.sql("""with cte1 as (select create_time , ts , date(ts) as Date , hour(ts) as hour
    ,job_id,campaign_id,group_id,publisher_id from clicks)
    select job_id,publisher_id,date,hour,campaign_id,group_id ,
    count(create_time) as conversion 
    from cte1
    group by job_id,publisher_id,date,hour,campaign_id,group_id""")
    return conversion_data

In [12]:
calculating_clicks(process_df).show()



+------+------------+----------+----+-----------+--------+----------+------+-------+
|job_id|publisher_id|      date|hour|campaign_id|group_id|spend_hour|clicks|bid_set|
+------+------------+----------+----+-----------+--------+----------+------+-------+
|     0|           0|2022-07-22|   2|          0|       0|         0|     9|    0.0|
|     0|           0|2022-07-21|   5|          0|       0|         0|     9|    0.0|
|     0|           0|2022-07-13|  10|          0|       0|         0|    10|    0.0|
|     0|           0|2022-07-15|   2|          0|       0|         0|     3|    0.0|
|     0|           0|2022-07-11|   1|          0|       0|         0|     4|    0.0|
|     0|           0|2022-07-20|  15|          0|       0|         0|     2|    0.0|
|     0|           0|2022-07-12|  13|          0|       0|         0|     1|    0.0|
|     0|           0|2022-07-21|  12|          0|       0|         0|     3|    0.0|
|     0|           0|2022-07-11|   7|          0|       0|       

In [13]:
def calculating_qualified(process_df):
    conversion_data = process_df.filter(process_df.custom_track == 'qualified')
    conversion_data = conversion_data.na.fill({'job_id':0})
    conversion_data = conversion_data.na.fill({'campaign_id':0})
    conversion_data = conversion_data.na.fill({'group_id':0})
    conversion_data = conversion_data.na.fill({'publisher_id':0})
    conversion_data.registerTempTable('conversion_data')
    conversion_data = spark.sql("""with cte1 as (select create_time , ts , date(ts) as Date , hour(ts) as hour
    ,job_id,campaign_id,group_id,publisher_id from clicks)
    select job_id,publisher_id,date,hour,campaign_id,group_id ,
    count(create_time) as qualified 
    from cte1
    group by job_id,publisher_id,date,hour,campaign_id,group_id""")
    return conversion_data

In [14]:
def calculating_unqualified(process_df):
    conversion_data = process_df.filter(process_df.custom_track == 'unqualified')
    conversion_data = conversion_data.na.fill({'job_id':0})
    conversion_data = conversion_data.na.fill({'campaign_id':0})
    conversion_data = conversion_data.na.fill({'group_id':0})
    conversion_data = conversion_data.na.fill({'publisher_id':0})
    conversion_data.registerTempTable('conversion_data')
    conversion_data = spark.sql("""with cte1 as (select create_time , ts , date(ts) as Date , hour(ts) as hour
    ,job_id,campaign_id,group_id,publisher_id from clicks)
    select job_id,publisher_id,date,hour,campaign_id,group_id ,
    count(create_time) as unqualified 
    from cte1
    group by job_id,publisher_id,date,hour,campaign_id,group_id""")
    return conversion_data

In [15]:
def process_cassandra_output(process_df):
    clicks_output = calculating_clicks(process_df)
    conversion_output = calculating_conversion(process_df)
    qualified_output = calculating_qualified(process_df)
    unqualified_output = calculating_unqualified(process_df)
    cassandra_output = clicks_output.join(conversion_output,['job_id','publisher_id','campaign_id','group_id','date','hour'],'full').\
    join(qualified_output,['job_id','publisher_id','campaign_id','group_id','date','hour'],'full').\
    join(unqualified_output,['job_id','publisher_id','campaign_id','group_id','date','hour'],'full')
    return cassandra_output 

In [16]:
cassandra_output=process_cassandra_output(process_df)

In [17]:
cassandra_output.show()

+------+------------+-----------+--------+----------+----+----------+------+-------+----------+---------+-----------+
|job_id|publisher_id|campaign_id|group_id|      date|hour|spend_hour|clicks|bid_set|conversion|qualified|unqualified|
+------+------------+-----------+--------+----------+----+----------+------+-------+----------+---------+-----------+
|     0|           0|          0|       0|2022-07-06|   9|         0|     1|    0.0|         1|        1|          1|
|     0|           0|          0|       0|2022-07-07|   2|         0|     3|    0.0|         3|        3|          3|
|     0|           0|          0|       0|2022-07-07|   3|         0|     2|    0.0|         2|        2|          2|
|     0|           0|          0|       0|2022-07-08|   2|         0|    12|    0.0|        12|       12|         12|
|     0|           0|          0|       0|2022-07-08|   4|         0|     3|    0.0|         3|        3|          3|
|     0|           0|          0|       0|2022-07-08|   

In [22]:
def mapping_mysql_field(cassandra_output,url,driver,user,password):
    sql = """(SELECT id as job_id,group_id,company_id FROM job) test """
    company_id=spark.read.format('jdbc').options(url=url, driver=driver, dbtable=sql, user=user, password=password).load()
    output=cassandra_output.join(company_id, cassandra_output.job_id == company_id.job_id, 'left').drop(company_id.job_id).drop(company_id.group_id)
    return output

In [24]:
output = mapping_mysql_field(cassandra_output,url,driver,user,password)

In [28]:
output.show()

+------+------------+-----------+--------+----------+----+----------+------+-------+----------+---------+-----------+----------+
|job_id|publisher_id|campaign_id|group_id|      date|hour|spend_hour|clicks|bid_set|conversion|qualified|unqualified|company_id|
+------+------------+-----------+--------+----------+----+----------+------+-------+----------+---------+-----------+----------+
|     0|           0|          0|       0|2022-07-06|   9|         0|     1|    0.0|         1|        1|          1|      null|
|     0|           0|          0|       0|2022-07-07|   2|         0|     3|    0.0|         3|        3|          3|      null|
|     0|           0|          0|       0|2022-07-07|   3|         0|     2|    0.0|         2|        2|          2|      null|
|     0|           0|          0|       0|2022-07-08|   2|         0|    12|    0.0|        12|       12|         12|      null|
|     0|           0|          0|       0|2022-07-08|   4|         0|     3|    0.0|         3|  

In [25]:
output.printSchema()

root
 |-- job_id: integer (nullable = true)
 |-- publisher_id: integer (nullable = true)
 |-- campaign_id: integer (nullable = true)
 |-- group_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- hour: integer (nullable = true)
 |-- spend_hour: long (nullable = true)
 |-- clicks: long (nullable = true)
 |-- bid_set: double (nullable = true)
 |-- conversion: long (nullable = true)
 |-- qualified: long (nullable = true)
 |-- unqualified: long (nullable = true)
 |-- company_id: integer (nullable = true)



In [26]:
def import_data_to_mysql(url,driver,user,password):
    output.write.format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/mydata") \
    .option("dbtable", "event") \
    .mode("append") \
    .option("user", "root") \
    .option("password", "1") \
    .save()
    return print('Successfully')

In [27]:
import_data_to_mysql(url,driver,user,password)

Successfully
