In [11]:
import findspark
findspark.init()

In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import when
import pyspark.sql.functions as sf
import os 

In [13]:
spark = SparkSession.builder.config("spark.driver.memory", "8g").config("spark.executor.cores",8).getOrCreate() 

In [14]:
def combine_Table(path):
    list_file = os.listdir(path)
    file_name = list_file[0]
    df =  spark.read.json(path + file_name)
    for i in list_file[1:]:
        df2 = spark.read.json(path + i)
        df = df.union(df2)
        df = df.cache()
    return df

In [15]:
def pivot_data(df):
    TV = df.filter(df.Type == 'Truyền Hình')
    Movie = df.filter(df.Type == 'Phim Truyện')
    Relax = df.filter(df.Type == 'Giải Trí')
    Child = df.filter(df.Type == 'Thiếu Nhi')
    Sport = df.filter(df.Type == 'Thể Thao')
    TV = TV.withColumnRenamed('TotalDuration','TVDuration').drop('Type')
    Movie = Movie.withColumnRenamed('TotalDuration','MovieDuration').drop('Type')
    Relax = Relax.withColumnRenamed('TotalDuration','RelaxDuration').drop('Type')
    Child = Child.withColumnRenamed('TotalDuration','ChildDuration').drop('Type')
    Sport = Sport.withColumnRenamed('TotalDuration','SportDuration').drop('Type')
    output = TV.join(Movie,on='Contract',how='outer').\
        join(Relax,on='Contract',how='outer').\
            join(Child,on='Contract',how='outer').\
                join(Sport,on='Contract',how='outer')
    return output 

In [16]:
def main_task(df):
    df.printSchema()
    df = df.select('_source.*')
    print('------------------------')
    print('Transforming data')
    print('------------------------')
    df = df.withColumn("Type",
           when((col("AppName") == 'CHANNEL') | (col("AppName") =='DSHD')| (col("AppName") =='KPLUS')| (col("AppName") =='KPlus'), "Truyền Hình")
          .when((col("AppName") == 'VOD') | (col("AppName") =='FIMS_RES')| (col("AppName") =='BHD_RES')| 
                 (col("AppName") =='VOD_RES')| (col("AppName") =='FIMS')| (col("AppName") =='BHD')| (col("AppName") =='DANET'), "Phim Truyện")
          .when((col("AppName") == 'RELAX'), "Giải Trí")
          .when((col("AppName") == 'CHILD'), "Thiếu Nhi")
          .when((col("AppName") == 'SPORT'), "Thể Thao")
          .otherwise("Error"))
    df = df.select('Contract','Type','TotalDuration')
    df = df.filter(df.Contract != '0' )
    df = df.filter(df.Type != 'Error')
    df = df.groupBy('Contract','Type').sum('TotalDuration').withColumnRenamed('sum(TotalDuration)','TotalDuration')
    print('-----------------------------')
    print('Pivoting data')
    print('-----------------------------')
    result = pivot_data(df)
    result = result.withColumn('Date',lit('2022-04-01'))
    print('-----------------------------')
    print('Showing result output')
    print('-----------------------------')
    result.show(10,truncate=False)
    print('-----------------------------')
    print('Saving result output')
    print('-----------------------------')
    result.repartition(1).write.csv('Result')
    return print('Task Ran Successfully')

In [17]:
path = "data_log_content//"
data = combine_Table(path)
main_task(data)

root
 |-- _id: string (nullable = true)
 |-- _index: string (nullable = true)
 |-- _score: long (nullable = true)
 |-- _source: struct (nullable = true)
 |    |-- AppName: string (nullable = true)
 |    |-- Contract: string (nullable = true)
 |    |-- Mac: string (nullable = true)
 |    |-- TotalDuration: long (nullable = true)
 |-- _type: string (nullable = true)

------------------------
Transforming data
------------------------
-----------------------------
Pivoting data
-----------------------------
-----------------------------
Showing result output
-----------------------------
+---------+----------+-------------+-------------+-------------+-------------+----------+
|Contract |TVDuration|MovieDuration|RelaxDuration|ChildDuration|SportDuration|Date      |
+---------+----------+-------------+-------------+-------------+-------------+----------+
|AGAAA0848|12141     |null         |null         |null         |null         |2022-04-01|
|AGAAA1147|1299590   |null         |103         