In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col , when
import os
from os import listdir
from pyspark.sql.types import StructType,StructField, StringType , LongType
import numpy as np
import pandas as pd
import psycopg2
from sqlalchemy import create_engine

In [2]:
spark = SparkSession.builder.config("spark.driver.memory","5g").getOrCreate()

In [3]:
path= "C:\\log_content\\"

In [4]:
def retrieve_data_from_source(path,filename):
    print("------------------------------------------")
    print("Retrieving data from source")
    print("------------------------------------------")
    df = spark.read.json(path+filename)
    df = df.select('_source.*')
    df.show()
    print("Processed data of {}".format(filename))
    return df

In [5]:
def preprocess_data(df):
    print("------------------------------------------")
    print("Categorizing data")
    print("------------------------------------------")
    df = df.withColumn("Type",
           when((col("AppName") == 'CHANNEL') | (col("AppName") =='DSHD')| (col("AppName") =='KPLUS')| (col("AppName") =='KPlus'), "Truyền Hình")
          .when((col("AppName") == 'VOD') | (col("AppName") =='FIMS_RES')| (col("AppName") =='BHD_RES')|
                 (col("AppName") =='VOD_RES')| (col("AppName") =='FIMS')| (col("AppName") =='BHD')| (col("AppName") =='DANET'), "Phim Truyện")
          .when((col("AppName") == 'RELAX'), "Giải Trí")
          .when((col("AppName") == 'CHILD'), "Thiếu Nhi")
          .when((col("AppName") == 'SPORT'), "Thể Thao")
          .otherwise("Error"))
    df.show()
    print("------------------------------------------")
    print("Finalizing data")
    print("------------------------------------------")
    df = df.select('Contract','Type','TotalDuration')
    df.show()
    return df


In [6]:
def pivot_data(df):
    print("------------------------------------------")
    print("Pivoting data")
    print("------------------------------------------")
    df = df.groupBy('Contract','Type').sum().withColumnRenamed('sum(TotalDuration)','TotalDuration')
    df_pivot = df.groupBy("Contract").pivot("Type").sum("TotalDuration")
    df_pivot.show()
    return df_pivot

In [7]:
def processing_data(df):
    df = preprocess_data(df)
    df = pivot_data(df)
    return df

In [8]:
def load(df):
    try:
        df.write.mode("overwrite") \
        .format("jdbc") \
        .option("url","jdbc:postgresql://localhost:5432/logcontent?user=etl&password=demopass") \
        .option("user", "etl") \
        .option("password", "demopass") \
        .option("driver", "org.postgresql.Driver") \
        .option("logcontent","table") \
        .save()
        print("Data imported successful")
    except Exception as e:
        print("Data load error: " + str(e))


In [9]:
def main_task(path):
    file_list = listdir(path)
    emp_RDD = spark.sparkContext.emptyRDD()
    schema = StructType([
      StructField('AppName', StringType(), True),
      StructField('Contract', StringType(), True),
      StructField('Mac', StringType(), True),
      StructField('TotalDuration',LongType(), True)
      ])
    df = spark.createDataFrame(data= emp_RDD,schema=schema)
    for i in file_list:
        df1 = retrieve_data_from_source(path,i)
        df = df.union(df1)
    df = processing_data(df)
    pdf= df.toPandas()
    engine = create_engine("postgresql+psycopg2://etl:demopass@localhost/logcontent?client_encoding=utf8")
    pdf.to_sql('Newtable', engine, index=False, if_exists='replace')
    return print("Task Run successfully")

In [10]:
main_task(path)

------------------------------------------
Retrieving data from source
------------------------------------------
+-------+---------+------------+-------------+
|AppName| Contract|         Mac|TotalDuration|
+-------+---------+------------+-------------+
|  KPLUS|HNH579912|0C96E62FC55C|          254|
|  KPLUS|HUFD40665|CCEDDC333614|         1457|
|  KPLUS|HNH572635|B068E6A1C5F6|         2318|
|  KPLUS|HND141717|08674EE8D2C2|         1452|
|  KPLUS|HNH743103|402343C25D7D|          251|
|  KPLUS|HNH893773|B84DEE76D3B8|          924|
|  KPLUS|HND083642|B84DEE849A0F|         1444|
|  KPLUS|DNFD74404|90324BB44C39|          691|
|  KPLUS|DTFD21200|B84DEED27709|         1436|
|  KPLUS|LDFD05747|0C96E6C95E53|         1434|
|  KPLUS|HNH063566|B84DEEDD1C85|          687|
|  KPLUS|HNH866786|10394E2790A5|          248|
|  KPLUS|NBAAA1128|10394E47C1AF|          247|
|  KPLUS|HNH960439|B84DEED34371|          683|
|  KPLUS|HNJ035736|CCD4A1FA86A5|          246|
|  KPLUS|NTFD93673|B84DEEEF4763|        

+-------+---------+------------+-------------+
|AppName| Contract|         Mac|TotalDuration|
+-------+---------+------------+-------------+
|CHANNEL|SGH478396|84AA9C924A55|        16813|
|CHANNEL|QID005208|B84DEE4402AC|        16813|
|CHANNEL|SGJ007445|B84DEEF585F1|        16813|
|CHANNEL|DNH076438|10394E27BD0F|        16812|
|CHANNEL|NAD021411|84AA9C3E5525|        16812|
|CHANNEL|QIFD03902|B84DEE43A8F7|        16812|
|CHANNEL|HYD015243|B84DEED38A63|        16812|
|CHANNEL|HPFD69557|08674EF37205|        16811|
|CHANNEL|HNH670857|10394E174687|        16811|
|CHANNEL|SGH477619|90324B0EC525|        16811|
|CHANNEL|HNH805509|B84DEE837978|        16811|
|CHANNEL|HPFD82211|E4AB897E92FF|        16811|
|CHANNEL|HNH904087|18473D53D5F9|        16810|
|CHANNEL|LCFD16773|9C305BD6497B|        16810|
|CHANNEL|HPFD35246|D46A6A45D318|        16810|
|CHANNEL|QNH009930|10394E4B92FD|        16809|
|CHANNEL|BDFD85883|802BF9E0CA11|        16809|
|CHANNEL|DND074241|B046FCADDA76|        16809|
|CHANNEL|VTFD

+-------+---------+------------+-------------+
|AppName| Contract|         Mac|TotalDuration|
+-------+---------+------------+-------------+
|CHANNEL|SGH740229|E4AB89738B00|          781|
|CHANNEL|HGD003644|B84DEED1B88D|          771|
|CHANNEL|SGH339686|E4AB891897D6|          781|
|CHANNEL|SGH205304|10394E3D7D4E|          817|
|CHANNEL|SGD521367|B046FCA6AF4B|          800|
|CHANNEL|BDH025226|D81265356426|          781|
|CHANNEL|SGH704998|08674EE8F53A|          817|
|CHANNEL|VPFD21365|B84DEEF6491C|          781|
|CHANNEL|QBFD03153|0C96E6AA125A|          781|
|CHANNEL|BNFD67898|10394E1935F5|          760|
|CHANNEL|TIFD23531|8CC84BDFA233|          781|
|CHANNEL|HTAAA0872|10394E47FB3A|          760|
|CHANNEL|HNH525654|84AA9C99CCFA|          771|
|CHANNEL|SGH658197|08674EDF44DB|          760|
|CHANNEL|SGD721494|B046FCB90D50|          781|
|CHANNEL|SGH786941|E4AB897EA713|          760|
|CHANNEL|DNFD22848|D46A6A45C205|          781|
|CHANNEL|HDFD10316|B84DEED286B1|          781|
|CHANNEL|BIFD

+-------+---------+------------+-------------+
|AppName| Contract|         Mac|TotalDuration|
+-------+---------+------------+-------------+
|CHANNEL|HGAAA2371|B4B5B6FD43E8|        73731|
|CHANNEL|HDFD70929|1CBFC0F227A8|        73728|
|CHANNEL|NDFD28048|B84DEED214EA|        73726|
|CHANNEL|HDD017864|0C96E6E866EC|        72775|
|CHANNEL|HPFD26768|B84DEE854001|        76216|
|CHANNEL|DAH009048|B84DEED1A788|        72772|
|CHANNEL|VPFD11652|08674EF6EEEF|        72339|
|CHANNEL|BNFD05564|B046FCB7F3D2|        74840|
|CHANNEL|TVFD04744|A86BAD5F6EA6|        75201|
|CHANNEL|SGH510088|B05216503306|        72335|
|CHANNEL|HNH537301|FC017C507BCF|        72763|
|CHANNEL|HMFD08695|8CC84BDFAB88|        76206|
|CHANNEL|SGH596455|0C96E68E52E0|        73715|
|CHANNEL|HDFD68836|1CBFC0F22550|        75185|
|CHANNEL|DAH029541|B84DEED8B963|        73713|
|CHANNEL|NBFD19133|B84DEE8823A4|        72318|
|CHANNEL|HPD059586|4CEBBDE0620B|        74187|
|CHANNEL|QAFD17030|10394E16EFB2|        76185|
|CHANNEL|HYFD

+-------+---------+------------+-------------+
|AppName| Contract|         Mac|TotalDuration|
+-------+---------+------------+-------------+
|CHANNEL|HNH700659|08674EE9564D|        20399|
|CHANNEL|HNH858922|10394E1364D5|        20398|
|CHANNEL|HNH638863|08674EDEA7C1|        20398|
|CHANNEL|HNH969007|E89EB419C363|        20096|
|CHANNEL|LAFD22648|CCEDDC3331C9|        20475|
|CHANNEL|NAFD08593|4CEBBD1B594B|        20095|
|CHANNEL|QNFD06520|84AA9C426521|        20095|
|CHANNEL|HUD001823|F8DA0C2A41AE|        20095|
|CHANNEL|BTD015286|10394E4B1F07|        20095|
|CHANNEL|QNAAA1787|10394E420901|        20095|
|CHANNEL|HNH057333|08674EDEB366|        20094|
|CHANNEL|BTD029699|B046FCC102CF|        20094|
|CHANNEL|SGH611763|485F994DA82D|        20471|
|CHANNEL|HPD065816|B046FCB46203|        20093|
|CHANNEL|BGAAA2405|10394E420C14|        20470|
|CHANNEL|SGH730949|E4AB8927C224|        20469|
|CHANNEL|BDH012376|E4AB8927AAB3|        20396|
|CHANNEL|LSFD15505|B84DEEF614FA|        20157|
|CHANNEL|LSFD