In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import when
from pyspark.sql.functions import col
from pyspark.sql.types import * 
from pyspark.sql.functions import lit
import pyodbc
import pandas as pd
import pyspark.sql.functions as sf


In [2]:
spark = SparkSession.builder.config("spark.driver.memory", "15g").getOrCreate()

In [3]:
def process_log_data(path,file_name,date):
    df = spark.read.json(path+file_name)
    df = df.select('_source.AppName','_source.Contract','_source.Mac','_source.TotalDuration')
    df = df.withColumn('Date',lit(date))
    df = df.withColumn('Type',
                 when((col("AppName") == 'CHANNEL') | (col("AppName") =='DSHD')| (col("AppName") =='KPLUS')| (col("AppName") =='KPlus'), "Truyền Hình")
          .when((col("AppName") == 'VOD') | (col("AppName") =='FIMS_RES')| (col("AppName") =='BHD_RES')| 
                 (col("AppName") =='VOD_RES')| (col("AppName") =='FIMS')| (col("AppName") =='BHD')| (col("AppName") =='DANET'), "Phim Truyện")
          .when((col("AppName") == 'RELAX'), "Giải Trí")
          .when((col("AppName") == 'CHILD'), "Thiếu Nhi")
          .when((col("AppName") == 'SPORT'), "Thể Thao")
          .otherwise("Error"))
    df = df.select('Contract','Type','TotalDuration','Date')
    df = df.groupBy('Contract','Type','Date').agg({'TotalDuration':'sum'}).withColumnRenamed('sum(TotalDuration)','TotalDuration')
    return df 

In [4]:
def main_task():
    path = 'C:\\Users\\ASUS\\OneDrive\\Big_Data_Analytics\\Dataset\\'
    file_name = '20220401.json'
    date = '2022-04-01'
    df = process_log_data(path,file_name,date)
    i = 2 
    while i < 10:
        file_name = '2022040{}.json'.format(i)
        date = '2022-04-0{}'.format(i)
        df1 = process_log_data(path,file_name,date)
        df = df.union(df1)
        i+= 1 
    while i <= 30: 
        file_name = '202204{}.json'.format(i)
        date = '2022-04-{}'.format(i)
        df1 = process_log_data(path,file_name,date)
        df = df.union(df1)
        i += 1 
    return df 

In [8]:
def process_daily_statistic(df):
    daily_statistic = df.groupBy('Date','Type').agg({'TotalDuration':'sum','Contract':'count'}).withColumnRenamed('sum(TotalDuration)','TotalDuration').withColumnRenamed('count(Contract)','TotalUsers')
    daily_statistics = daily_statistic.withColumn('TotalDuration',col('TotalDuration')/3600)
    return daily_statistics


In [5]:
df = main_task() 

In [6]:
df = df.drop('Date')

In [7]:
df = df.groupBy('Contract','Type').agg({'TotalDuration':'sum'}).withColumnRenamed('sum(TotalDuration)','TotalDuration')

In [9]:
df = df.filter(df.Type != 'Error')

In [12]:
df.columns

['Contract', 'Type', 'TotalDuration']

In [14]:
TV = df.filter(df.Type=='Truyền Hình').drop('Type')
Sport = df.filter(df.Type=='Thể Thao').drop('Type')
Movie = df.filter(df.Type=='Phim Truyện').drop('Type')
Child = df.filter(df.Type=='Thiếu Nhi').drop('Type')
Relax = df.filter(df.Type=='Giải Trí').drop('Type')

In [16]:
TV = TV.withColumnRenamed('TotalDuration','TVDuration')
Sport = Sport.withColumnRenamed('TotalDuration','SportDuration')
Child = Child.withColumnRenamed('TotalDuration','ChildDuration')
Relax = Relax.withColumnRenamed('TotalDuration','RelaxDuration')
Movie = Movie.withColumnRenamed('TotalDuration','MovieDuration')

In [17]:
result = TV.join(Sport, ['Contract'], how='full').join(Movie,['Contract'],how ='full').join(Child,['Contract'],how ='full').join(Relax,['Contract'],how ='full')

In [19]:
result = result.withColumn("Date",lit('2022-04-01'))

In [20]:
result.printSchema()

root
 |-- Contract: string (nullable = true)
 |-- TVDuration: long (nullable = true)
 |-- SportDuration: long (nullable = true)
 |-- MovieDuration: long (nullable = true)
 |-- ChildDuration: long (nullable = true)
 |-- RelaxDuration: long (nullable = true)
 |-- Date: string (nullable = false)



In [21]:
result.show(10)

+---------+----------+-------------+-------------+-------------+-------------+----------+
| Contract|TVDuration|SportDuration|MovieDuration|ChildDuration|RelaxDuration|      Date|
+---------+----------+-------------+-------------+-------------+-------------+----------+
|AGAAA0848|     12141|         null|         null|         null|         null|2022-04-01|
|AGAAA1147|   1299590|         null|         null|         null|          103|2022-04-01|
|AGAAA1282|    168792|         null|         null|          427|         null|2022-04-01|
|AGAAA1992|     75782|         null|         null|         null|         null|2022-04-01|
|AGAAA2588|   1078595|         null|         null|         null|         null|2022-04-01|
|AGAAA3255|     21636|         null|           22|           36|         null|2022-04-01|
|AGD003807|      4352|         null|       153369|          164|         null|2022-04-01|
|AGD004253|    264972|         null|         null|         null|         null|2022-04-01|
|AGD008179

In [22]:
result.coalesce(1).write.option("header", "true").csv("T4Duration.csv")