In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import when
from pyspark.sql.functions import col
from pyspark.sql.types import * 
from pyspark.sql.functions import lit
import pyodbc
import pandas as pd
import pyspark.sql.functions as sf


In [2]:
spark = SparkSession.builder.config("spark.driver.memory", "9g").getOrCreate()

In [3]:
def calculate_data_daily(path, from_date, to_date):
    start = int(from_date[-2:])
    end = int(to_date[-2:])
    
    #create empty DataFrame
    schema = StructType([
        StructField('Contract', StringType(), True),
        StructField('AppName', StringType(), True),
        StructField('TotalDuration', LongType(), True)
        ])
    data = spark.createDataFrame(spark.sparkContext.emptyRDD(),schema)
    
    #get data
    for i in range(start,end+1):
        if i<10:
            file = """2022040{}.json""".format(i)
            str_date = "2022-04-0{}"
        else:
            file = """202204{}.json""".format(i)
            str_date = "2022-04-{}"
        data_date = spark.read.json(path+file)
        data_date = data_date.select( '_source.Contract','_source.AppName', '_source.TotalDuration')
        data = data.union(data_date)
        #data.cache()
    return data

In [4]:
path = "F:\\Study\\X-DATA\\Big Data\\DataSet\\"
from_date = "20220401"
to_date = "20220430"

In [5]:
data = calculate_data_daily(path, from_date,to_date)

In [6]:
data = data.cache()

In [7]:
data.count()

48457499

In [8]:
data.show()

+---------+-------+-------------+
| Contract|AppName|TotalDuration|
+---------+-------+-------------+
|HNH579912|  KPLUS|          254|
|HUFD40665|  KPLUS|         1457|
|HNH572635|  KPLUS|         2318|
|HND141717|  KPLUS|         1452|
|HNH743103|  KPLUS|          251|
|HNH893773|  KPLUS|          924|
|HND083642|  KPLUS|         1444|
|DNFD74404|  KPLUS|          691|
|DTFD21200|  KPLUS|         1436|
|LDFD05747|  KPLUS|         1434|
|HNH063566|  KPLUS|          687|
|HNH866786|  KPLUS|          248|
|NBAAA1128|  KPLUS|          247|
|HNH960439|  KPLUS|          683|
|HNJ035736|  KPLUS|          246|
|NTFD93673|  KPLUS|         2288|
|HNJ063267|  KPLUS|         2282|
|HNH790383|  KPLUS|          906|
|THFD12466|  KPLUS|          242|
|HNH566080|  KPLUS|          242|
+---------+-------+-------------+
only showing top 20 rows



In [9]:
data = data.withColumn("Type",
       when((col("AppName") == 'CHANNEL') | (col("AppName") =='DSHD')| (col("AppName") =='KPLUS')| (col("AppName") =='KPlus'), "Truyền Hình")
      .when((col("AppName") == 'VOD') | (col("AppName") =='FIMS_RES')| (col("AppName") =='BHD_RES')| 
             (col("AppName") =='VOD_RES')| (col("AppName") =='FIMS')| (col("AppName") =='BHD')| (col("AppName") =='DANET'), "Phim Truyện")
      .when((col("AppName") == 'RELAX'), "Giải Trí")
      .when((col("AppName") == 'CHILD'), "Thiếu Nhi")
      .when((col("AppName") == 'SPORT'), "Thể Thao")
      .otherwise("Error"))

In [10]:
data.printSchema()

root
 |-- Contract: string (nullable = true)
 |-- AppName: string (nullable = true)
 |-- TotalDuration: long (nullable = true)
 |-- Type: string (nullable = false)



In [11]:
data = data.drop('AppName')

In [12]:
data.printSchema()

root
 |-- Contract: string (nullable = true)
 |-- TotalDuration: long (nullable = true)
 |-- Type: string (nullable = false)



In [13]:
data.orderBy('Contract').show()

+--------+-------------+-----------+
|Contract|TotalDuration|       Type|
+--------+-------------+-----------+
|        |           52|Truyền Hình|
|        |         3041|Truyền Hình|
|        |           35|Truyền Hình|
|        |          125|      Error|
|        |           84|Truyền Hình|
|        |         4969|Truyền Hình|
|       0|        53318|Truyền Hình|
|       0|        54118|Truyền Hình|
|       0|        17828|Truyền Hình|
|       0|        23176|Truyền Hình|
|       0|        17871|Truyền Hình|
|       0|        17772|Truyền Hình|
|       0|        19302|Truyền Hình|
|       0|        17991|Truyền Hình|
|       0|        10349|Truyền Hình|
|       0|        10462|Truyền Hình|
|       0|        10453|Truyền Hình|
|       0|        10455|Truyền Hình|
|       0|        10321|Truyền Hình|
|       0|        53020|Truyền Hình|
+--------+-------------+-----------+
only showing top 20 rows



In [14]:
data = data.filter((data.Contract !='') & (data.Contract != '0')).orderBy('Contract')

In [15]:
data.show()

+--------------+-------------+-----------+
|      Contract|TotalDuration|       Type|
+--------------+-------------+-----------+
|113.182.209.48|           20|Truyền Hình|
|113.182.209.48|           43|Truyền Hình|
|113.182.209.48|           89|   Giải Trí|
|113.183.15.246|         8203|Truyền Hình|
| 113.183.60.85|         1191|Truyền Hình|
|113.191.153.52|         9497|Truyền Hình|
|116.107.44.221|          193|Truyền Hình|
|116.107.44.221|           57|Phim Truyện|
|116.107.44.221|           74|   Giải Trí|
| 123.17.50.134|          293|Truyền Hình|
| 123.25.79.212|         3361|Truyền Hình|
| 123.25.79.212|         9922|Truyền Hình|
|14.166.200.230|           14|Truyền Hình|
|14.181.179.168|         3142|Truyền Hình|
|14.182.110.125|           92|   Giải Trí|
|14.182.110.125|          404|Truyền Hình|
| 14.182.41.177|           29|Truyền Hình|
|14.189.124.168|          147|Truyền Hình|
|14.189.216.181|         1345|Truyền Hình|
| 14.191.177.89|          164|Truyền Hình|
+----------

In [16]:
data = data.filter(data.Type != 'Error')

In [17]:
data.cache()

DataFrame[Contract: string, TotalDuration: bigint, Type: string]

In [18]:
data.count()

47805576

In [19]:
data.groupBy('Contract','Type').sum('TotalDuration').orderBy('Contract').show()

+---------------+-----------+------------------+
|       Contract|       Type|sum(TotalDuration)|
+---------------+-----------+------------------+
| 113.182.209.48|   Giải Trí|                89|
| 113.182.209.48|Truyền Hình|                63|
| 113.183.15.246|Truyền Hình|              8203|
|  113.183.60.85|Truyền Hình|              1191|
| 113.191.153.52|Truyền Hình|              9497|
| 116.107.44.221|Phim Truyện|                57|
| 116.107.44.221|Truyền Hình|               193|
| 116.107.44.221|   Giải Trí|                74|
|  123.17.50.134|Truyền Hình|               293|
|  123.25.79.212|Truyền Hình|             13283|
| 14.166.200.230|Truyền Hình|                14|
| 14.181.179.168|Truyền Hình|              3142|
| 14.182.110.125|   Giải Trí|                92|
| 14.182.110.125|Truyền Hình|               404|
|  14.182.41.177|Truyền Hình|                29|
| 14.189.124.168|Truyền Hình|               147|
| 14.189.216.181|Truyền Hình|              1345|
|  14.191.177.89|Tru

In [20]:
data = data.groupBy('Contract','Type').sum('TotalDuration').orderBy('Contract')

In [21]:
data.count()

2729276

In [22]:
data = data.withColumnRenamed('sum(TotalDuration)','TotalDuration')

In [23]:
data.show()

+---------------+-----------+-------------+
|       Contract|       Type|TotalDuration|
+---------------+-----------+-------------+
| 113.182.209.48|   Giải Trí|           89|
| 113.182.209.48|Truyền Hình|           63|
| 113.183.15.246|Truyền Hình|         8203|
|  113.183.60.85|Truyền Hình|         1191|
| 113.191.153.52|Truyền Hình|         9497|
| 116.107.44.221|   Giải Trí|           74|
| 116.107.44.221|Phim Truyện|           57|
| 116.107.44.221|Truyền Hình|          193|
|  123.17.50.134|Truyền Hình|          293|
|  123.25.79.212|Truyền Hình|        13283|
| 14.166.200.230|Truyền Hình|           14|
| 14.181.179.168|Truyền Hình|         3142|
| 14.182.110.125|Truyền Hình|          404|
| 14.182.110.125|   Giải Trí|           92|
|  14.182.41.177|Truyền Hình|           29|
| 14.189.124.168|Truyền Hình|          147|
| 14.189.216.181|Truyền Hình|         1345|
|  14.191.177.89|Truyền Hình|          164|
|171.244.225.105|Truyền Hình|          105|
|171.244.225.105|Phim Truyện|   

In [24]:
data.select('Type').distinct().show()

+-----------+
|       Type|
+-----------+
|   Thể Thao|
|Truyền Hình|
|  Thiếu Nhi|
|   Giải Trí|
|Phim Truyện|
+-----------+



In [25]:
result_pivot = data.groupBy("Contract").pivot("Type").sum("TotalDuration")

In [26]:
result_pivot = result_pivot.cache()

In [27]:
result_pivot.show()

+---------------+--------+-----------+---------+--------+-----------+
|       Contract|Giải Trí|Phim Truyện|Thiếu Nhi|Thể Thao|Truyền Hình|
+---------------+--------+-----------+---------+--------+-----------+
| 113.182.209.48|      89|       null|     null|    null|         63|
| 113.183.15.246|    null|       null|     null|    null|       8203|
|  113.183.60.85|    null|       null|     null|    null|       1191|
| 113.191.153.52|    null|       null|     null|    null|       9497|
| 116.107.44.221|      74|         57|     null|    null|        193|
|  123.17.50.134|    null|       null|     null|    null|        293|
|  123.25.79.212|    null|       null|     null|    null|      13283|
| 14.166.200.230|    null|       null|     null|    null|         14|
| 14.181.179.168|    null|       null|     null|    null|       3142|
| 14.182.110.125|      92|       null|     null|    null|        404|
|  14.182.41.177|    null|       null|     null|    null|         29|
| 14.189.124.168|   

In [28]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank
from pyspark.sql.functions import col

In [29]:
window = Window.partitionBy("Contract").orderBy(col('TotalDuration').desc())

In [30]:
rank_result = data.withColumn('RANK',rank().over(window))

In [31]:
rank_result.filter(rank_result.Contract=='AGAAA0520').show()

+---------+-----------+-------------+----+
| Contract|       Type|TotalDuration|RANK|
+---------+-----------+-------------+----+
|AGAAA0520|Truyền Hình|       173458|   1|
|AGAAA0520|Phim Truyện|          447|   2|
+---------+-----------+-------------+----+



In [32]:
most_watched = rank_result.filter(rank_result.RANK == '1')

In [33]:
most_watched = most_watched.drop('TotalDuration').drop('RANK').withColumnRenamed('Type','Most_Watch')

In [34]:
habbit = result_pivot.join(most_watched,['Contract'],'inner')

In [35]:
habbit = habbit.withColumn('Month',lit('Apr-2022'))

In [36]:
habbit.show()

+--------------+--------+-----------+---------+--------+-----------+-----------+--------+
|      Contract|Giải Trí|Phim Truyện|Thiếu Nhi|Thể Thao|Truyền Hình| Most_Watch|   Month|
+--------------+--------+-----------+---------+--------+-----------+-----------+--------+
|113.182.209.48|      89|       null|     null|    null|         63|   Giải Trí|Apr-2022|
|14.182.110.125|      92|       null|     null|    null|        404|Truyền Hình|Apr-2022|
|14.189.124.168|    null|       null|     null|    null|        147|Truyền Hình|Apr-2022|
|     AGAAA0338|    null|       null|     null|    null|     278633|Truyền Hình|Apr-2022|
|     AGAAA0342|     204|       null|     null|    null|     117788|Truyền Hình|Apr-2022|
|     AGAAA0346|    null|       null|     null|    null|    2056249|Truyền Hình|Apr-2022|
|     AGAAA0353|    null|       1665|     null|    null|      25982|Truyền Hình|Apr-2022|
|     AGAAA0372|    null|       null|     null|    null|      13123|Truyền Hình|Apr-2022|
|     AGAA

Dựa vào kết quả trên biết được thể loại chương trình ưa chuộng của từng người dùng