#  <center> Project ETL behavior data customer

---

In [1]:
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
import pandas as pd 
import os
from pyspark.sql.functions import expr
from pyspark.sql.window import Window
from pyspark.sql.functions import rank
from pyspark.sql.functions import lit
import pyspark.sql.functions as sf


In [2]:
spark = SparkSession.builder.config("spark.driver.memory", "8g").config('spark.jars.packages','mysql:mysql-connector-java:8.0.17').getOrCreate()

## Connect HDFS and get file name data

In [3]:
URI           = spark._jvm.java.net.URI
Path          = spark._jvm.org.apache.hadoop.fs.Path
FileSystem    = spark._jvm.org.apache.hadoop.fs.FileSystem
Configuration = spark._jvm.org.apache.hadoop.conf.Configuration
fs = FileSystem.get(URI("hdfs://localhost:9900"), Configuration())
status = fs.listStatus(Path('/user/log_content'))
Path_link=[]
for fileStatus in status:
    Path_link.append(fileStatus.getPath().getName())
path_HDFS='hdfs://localhost:9900//user//log_content//'

In [4]:
Path_link

['20220401.json',
 '20220402.json',
 '20220403.json',
 '20220404.json',
 '20220405.json',
 '20220406.json',
 '20220407.json',
 '20220408.json',
 '20220409.json',
 '20220410.json',
 '20220411.json',
 '20220412.json',
 '20220413.json',
 '20220414.json',
 '20220415.json',
 '20220416.json',
 '20220417.json',
 '20220418.json',
 '20220419.json',
 '20220420.json',
 '20220421.json',
 '20220422.json',
 '20220423.json',
 '20220424.json',
 '20220425.json',
 '20220426.json',
 '20220427.json',
 '20220428.json',
 '20220429.json',
 '20220430.json']

## ETL 1 day

In [5]:
def ETL_1day(pathHDFS,filename):
      Data=spark.read.json(pathHDFS+filename)
      New_Data=Data.select('_source.*')
      New_Data=New_Data.drop('Mac')
      New_Data=New_Data.withColumn('Category',when((col("AppName") == 'CHANNEL') | (col("AppName") =='DSHD')| (col("AppName") =='KPLUS')| (col("AppName") =='KPlus'), "Truyền Hình")
            .when((col("AppName") == 'VOD') | (col("AppName") =='FIMS_RES')| (col("AppName") =='BHD_RES')| 
                  (col("AppName") =='VOD_RES')| (col("AppName") =='FIMS')| (col("AppName") =='BHD')| (col("AppName") =='DANET'), "Phim Truyện")
            .when((col("AppName") == 'RELAX'), "Giải Trí")
            .when((col("AppName") == 'CHILD'), "Thiếu Nhi")
            .when((col("AppName") == 'SPORT'), "Thể Thao")
            .otherwise("Error"))
      New_Data=New_Data.select('Contract','Category','TotalDuration')
      New_Data=New_Data.filter(New_Data.Category != 'Error')
      New_Data=New_Data.groupBy('Contract','category').sum()
      New_Data=New_Data.withColumnRenamed('sum(TotalDuration)','TotalDuration')
      return New_Data

In [6]:
ETL_1day(path_HDFS,Path_link[0]).show()

+---------+-----------+-------------+
| Contract|   category|TotalDuration|
+---------+-----------+-------------+
|DNH014998|Phim Truyện|         3365|
|HND486882|Phim Truyện|         5545|
|HUFD07189|Truyền Hình|         2264|
|HDFD36288|Truyền Hình|        11904|
|CTFD04401|Truyền Hình|        55881|
|HNH954607|Phim Truyện|        13115|
|HNH855959|Truyền Hình|          327|
|SGH034683|Truyền Hình|        82195|
|NTFD35330|Truyền Hình|        19139|
|NTFD48198|Phim Truyện|        55202|
|HNH443856|Truyền Hình|         7687|
|NAFD05338|Truyền Hình|        81934|
|LCFD20510|Phim Truyện|        10852|
|QNFD29007|Truyền Hình|        82705|
|SGH569599|Truyền Hình|        18769|
|SGH701752|Truyền Hình|        31028|
|HNH712164|Phim Truyện|         6106|
|PYFD01920|Truyền Hình|        82664|
|HTFD13716|Truyền Hình|        18759|
|SGJ039473|Truyền Hình|        18695|
+---------+-----------+-------------+
only showing top 20 rows



## Create columns and ETL 30 day

### Creat TotalDuration column

In [7]:
def Total_Duration(df):
    cols_list=df.columns[1:]
    expression = '+'.join(cols_list)
    Total_data=df.withColumn('TotalDuration',expr(expression))
    Total_data=Total_data.select('Contract','TotalDuration')
    return Total_data

### Creat Most_Watch column

In [8]:
def MostWatch(df):
        Most_watch=df.withColumnRenamed('RelaxDuration','Relax') \
            .withColumnRenamed('MovieDuration','Movie') \
            .withColumnRenamed('ChildDuration','Child') \
            .withColumnRenamed('SportDuration','Sport') \
            .withColumnRenamed('TvDuration','Tv') 
        Most_watch=Most_watch.withColumn(
                                "max",sf.greatest(*[sf.col(x) for x in Most_watch.columns[1:]]))\
                             .withColumn(
                                'Most_watch',eval("sf.when" + ".when".join(["(sf.col('" + c + "') == sf.col('max'), sf.lit('" + c + "'))" for c in Most_watch.columns])))
        Most_watch=Most_watch.withColumn('Most_watch', when(col('max')== 0, 'No').otherwise(Most_watch.Most_watch))
        Most_watch=Most_watch.select('Contract','Most_watch')
        return Most_watch


### Create Tase column

In [9]:
def Tase(df):
    Tase=df.withColumn('RelaxDuration',when(df.RelaxDuration != 0,'Relax').otherwise(None)) \
            .withColumn('MovieDuration',when(df.MovieDuration != 0,'Movie').otherwise(None)) \
            .withColumn('ChildDuration',when(df.ChildDuration != 0,'Child').otherwise(None)) \
            .withColumn('SportDuration',when(df.SportDuration != 0,'Sport').otherwise(None)) \
            .withColumn('TvDuration',when(df.TvDuration != 0,'Tv').otherwise(""))  
    Tase=Tase.withColumn('Tase',concat_ws('_',*[i for i in Tase.columns[1:]]))
    Tase=Tase.withColumn('Tase',when(Tase.Tase=='','No').otherwise(Tase.Tase))
    Tase=Tase.select('Contract','Tase')
    return Tase

###  Create Activeness clomun

In [10]:
def Actieness(df):
    Count_Active=df.groupBy('Contract').count()
    Count_Active=Count_Active.withColumnRenamed('count','Activeness')
    return Count_Active

### ETL 30 day

In [11]:
def ETL_30_day():
    Result1=ETL_1day(path_HDFS,Path_link[0])
    Count_US=Result1.select('Contract')
    Count_US=Count_US.distinct()
    print('Job Finished {}'.format(Path_link[0]))
    for i in Path_link[1:]:
        Result2=ETL_1day(path_HDFS,i)
        Result1=Result1.union(Result2)
        Count_US_2=Result2.select('Contract')
        Count_US_2=Count_US_2.distinct()
        Count_US=Count_US.union(Count_US_2)
        print('Job Finished {}'.format(i))
        # Result1 = Result1.cache()
    Activeness=Actieness(Count_US)
    Result1=Result1.groupBy('Contract','Category').sum()
    Result1=Result1.withColumnRenamed('sum(TotalDuration)','TotalDuration')
    KQ=Result1.groupBy('Contract').pivot('Category').sum('TotalDuration')
    KQ=KQ.withColumnRenamed('Giải Trí','RelaxDuration') \
                .withColumnRenamed('Phim Truyện','MovieDuration') \
                .withColumnRenamed('Thiếu Nhi','ChildDuration') \
                .withColumnRenamed('Thể Thao','SportDuration') \
                .withColumnRenamed('Truyền Hình','TvDuration') 
    KQ=KQ.fillna(0)
    TotalDuration=Total_Duration(KQ)
    Most_watch=MostWatch(KQ)
    Tase_Data=Tase(KQ)
    KQ=KQ.join(TotalDuration,'Contract','inner')
    KQ=KQ.join(Most_watch,'Contract','inner')
    KQ=KQ.join(Tase_Data,'Contract','inner')
    KQ=KQ.join(Activeness,'Contract','inner')
    KQ=KQ.withColumn('Date',sf.lit('2023-09-14'))
    KQ.repartition(1).write.csv('E:\\CV\\DoAn1\\ETL_1_month',header=True)
    print('job Finished succesfull')
    return KQ

In [12]:
KQ=ETL_30_day()

Job Finished 20220401.json
Job Finished 20220402.json
Job Finished 20220403.json
Job Finished 20220404.json
Job Finished 20220405.json
Job Finished 20220406.json
Job Finished 20220407.json
Job Finished 20220408.json
Job Finished 20220409.json
Job Finished 20220410.json
Job Finished 20220411.json
Job Finished 20220412.json
Job Finished 20220413.json
Job Finished 20220414.json
Job Finished 20220415.json
Job Finished 20220416.json
Job Finished 20220417.json
Job Finished 20220418.json
Job Finished 20220419.json
Job Finished 20220420.json
Job Finished 20220421.json
Job Finished 20220422.json
Job Finished 20220423.json
Job Finished 20220424.json
Job Finished 20220425.json
Job Finished 20220426.json
Job Finished 20220427.json
Job Finished 20220428.json
Job Finished 20220429.json
Job Finished 20220430.json
job Finished succesfull


## Push code to MySQL database

In [31]:
Data_MYSQL=KQ.limit(10000)


In [29]:
url = 'jdbc:mysql://' + 'localhost' + ':' + '3307' +'/'+'DataEngineer'
driver = "com.mysql.cj.jdbc.Driver"
user='root'
password='1234'


In [30]:
Data_MYSQL.write.format('jdbc').option('url',url).option('driver',driver).option('dbtable','Customer_Behavior').option('user',user).option('password',password).mode('append').save()