In [2]:

import pyspark
from pyspark.sql import *
import numpy as np
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.types import *
import re

In [3]:
spark = SparkSession.builder.appName('Cleaning').getOrCreate()

25/03/27 17:31:58 WARN Utils: Your hostname, MacBook-Pro-cua-Vo-4.local resolves to a loopback address: 127.0.0.1; using 10.10.1.10 instead (on interface en0)
25/03/27 17:31:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/27 17:31:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/27 17:32:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Read File

In [4]:
# Training Set
context = spark.read.csv('../initial_data/train/context.csv',header = True,inferSchema= True)
mpa = spark.read.csv('../initial_data/train/mobile_plan_attr.csv',header = True,inferSchema= True,sep=";")
mpu = spark.read.csv('../initial_data/train/mobile_plan_user.csv',header = True,inferSchema= True)
user = spark.read.csv('../initial_data/train/user.csv',header = True,inferSchema= True)

train_set = {'user': user,'context': context, 'mpa': mpa, 'mpu': mpu }




                                                                                

In [5]:
# Testing Set
context_test = spark.read.csv('../initial_data/test/context.csv',header = True,inferSchema= True)
mpa_test = spark.read.csv('../initial_data/test/mobile_plan_attr.csv',header = True,inferSchema= True,sep=";")
mpu_test = spark.read.csv('../initial_data/test/mobile_plan_user.csv',header = True,inferSchema= True)
user_test = spark.read.csv('../initial_data/test/user.csv',header = True,inferSchema= True)

test_set = {'user': user_test,'context': context_test, 'mpa': mpa_test, 'mpu': mpu_test }

# Preprocessing

## Missing Value

In [6]:
for key,df in train_set.items():
    length = df.count()
    save = []
    for c in df.columns:
        null_count = df.filter(col(c).isNull()).count()  + df.filter(col(c) == 'None').count()
        if null_count != 0:
            save.append({'column' : c ,'count':null_count,'percentage': null_count/length * 100})
    if save:
        missing_df = pd.DataFrame(save)
        print(f'------------------ {key} ----------------------')
        print(missing_df)

                                                                                

------------------ user ----------------------
      column  count  percentage
0  education   4003   34.592119
------------------ mpu ----------------------
        column  count  percentage
0  mobile_plan   2715    5.990600
1       accept   2721    6.003839


In [7]:
for key,df in test_set.items():
    length = df.count()
    save = []
    for c in df.columns:
        null_count = df.filter(col(c).isNull()).count()  + df.filter(col(c) == 'None').count()
        if null_count != 0:
            save.append({'column' : c ,'count':null_count,'percentage': null_count/length * 100})
    if save:
        missing_df = pd.DataFrame(save)
        print(f'------------------ {key} ----------------------')
        print(missing_df)

------------------ user ----------------------
      column  count  percentage
0  education    348   31.294964
------------------ mpu ----------------------
      column  count  percentage
0  coupon_id     87    6.227631
1     accept     92    6.585540


## User

In [8]:
user.show()

+-----+--------------------+------+---+---------+--------------------+--------------+---------------+---------+--------------------+-------------------------+-------+-------+----------+-----------------+--------------+
|   id|                name|gender|age|education|          profession|        income|    living_with|   nation|               phone|                      job|fb_freq|yt_freq|insta_freq|use_less_than_2GB|use_2GB_to_4GB|
+-----+--------------------+------+---+---------+--------------------+--------------+---------------+---------+--------------------+-------------------------+-------+-------+----------+-----------------+--------------+
|11156|        Rachel Gibbs|Female| 21|     None|          Unemployed|        39100$|   Unmarrie d_2|AUSTRALIA|      (08)-8012-7556|               Astronomer|      0|      0|         9|                8|             3|
| 4297|      Karen Anderson| Other| 22|     None|          Unemployed|        41000$|  U nmarr ied_2|  ENGLAND|        943-6

## Name

In [9]:
user = user.drop('name')
user_test = user_test.drop('name')

## Gender

In [10]:
user = user.withColumn('gender',lower(col('gender')))
user_test = user_test.withColumn('gender',lower(col('gender')))

## Education

In [11]:
user = user.withColumn(
    'education', 
    when(col('education').isin(['highschool', 'associate']), 'undergrad')
    .when(col('education') == 'masters', 'postgrad')
    .when(col('education') == 'unknown', 'unknown')
    .otherwise('grad')
)
user_test = user_test.withColumn(
    'education', 
    when(col('education').isin(['highschool', 'associate']), 'undergrad')
    .when(col('education') == 'masters', 'postgrad')
    .when(col('education') == 'unknown', 'unknown')
    .otherwise('grad')
)

## Income

In [12]:
def income_convert(x):
    if 'VND' in x:
        x = int(x.replace('VND','')) 
        x /= 25000
        
    elif '$' in x:
        x =  x.replace('$','')
    if int(x) <= 30000:
        return 'lower'
    elif int(x) <= 58000:
        return 'lower-middle'
    elif int(x) <= 94000:
        return 'middle'
    elif int(x) <=153000:
        return 'upper-middle'
    elif int(x) <=200000:
        return 'upper'
    else:
        return 'extreme upper'
udf_income =  udf(income_convert,StringType())

In [13]:
user = user.withColumn('income', udf_income(col('income')))
user_test = user_test.withColumn('income', udf_income(col('income')))

## Profession

In [14]:
def convert_profession(x):
    if x not in ['unemployed','retired','student']:
        return 'employed'
    else :
        return x
udf_profession =  udf(convert_profession,StringType())

In [15]:
user = user.withColumn('profession', udf_profession(col('profession')))
user_test = user_test.withColumn('profession',udf_profession(col('profession')))

## Living_with

In [16]:

def extract_child(x):

    x = re.sub(r'[a-zA-Z_]+', '', x)  
    return int(x) if x.isdigit() else 0  


def extract_status(x):
    x = re.sub(r'[0-9_ ]+', '', x).lower() 
    return "singled" if x != "married" else "married" 


udf_ext_child = udf(extract_child, IntegerType())  
udf_ext_status = udf(extract_status, StringType())





In [17]:

user = user.withColumn('num_child', udf_ext_child(col('living_with')))
user = user.withColumn('living_with', udf_ext_status(col('living_with')))
user = user.withColumnRenamed('living_with','martial_status')

user_test = user_test.withColumn('num_child', udf_ext_child(col('living_with')))
user_test = user_test.withColumn('living_with', udf_ext_status(col('living_with')))
user_test = user_test.withColumnRenamed('living_with','martial_status')


## Nation

In [18]:
def convert_continent(x):
    continent_map = {
    'US': 'america',
    'ENGLAND': 'europe',
    'CHINA': 'asia',
    'JAPAN': 'asia',
    'RUSSIA': 'europe',
    'BRAZIL': 'america',
    'AUSTRALIA': 'oceania',
    'KOREA': 'asia',
    'DENMARK': 'europe'
}   
    return continent_map.get(x,'unknown')


udf_continent = udf(convert_continent,StringType())

In [19]:
user = user.withColumn('nation', udf_continent(col('nation')))
user = user.withColumnRenamed('nation','continent')

user_test = user_test.withColumn('nation', udf_continent(col('nation')))
user_test = user_test.withColumnRenamed('nation','continent')

## Phone, Job

In [21]:
user = user.drop('phone','job')
user_test = user_test.drop('phone','job')

In [43]:
user.show()

+-----+------+---+---------+----------+------------+--------------+---------+-------+-------+----------+-----------------+--------------+---------+
|   id|gender|age|education|profession|      income|martial_status|continent|fb_freq|yt_freq|insta_freq|use_less_than_2GB|use_2GB_to_4GB|num_child|
+-----+------+---+---------+----------+------------+--------------+---------+-------+-------+----------+-----------------+--------------+---------+
|11156|female| 21|     grad|  employed|lower-middle|       singled|  oceania|      0|      0|         9|                8|             3|        0|
| 4297| other| 22|     grad|  employed|lower-middle|       singled|   europe|      0|      0|        23|                7|             2|        0|
|13301|female| 24|     grad|  employed|lower-middle|       singled|     asia|      0|      0|        23|                5|             2|        0|
| 9920|female| 24|     grad|  employed|lower-middle|       singled|   europe|      0|      0|        21|        

In [44]:
user_test.show()

+-----+------+---+---------+----------+------------+--------------+---------+-------+-------+----------+-----------------+--------------+---------+
|   id|gender|age|education|profession|      income|martial_status|continent|fb_freq|yt_freq|insta_freq|use_less_than_2GB|use_2GB_to_4GB|num_child|
+-----+------+---+---------+----------+------------+--------------+---------+-------+-------+----------+-----------------+--------------+---------+
| 7067|female| 33|     grad|  employed|      middle|       married|     asia|      0|      1|         3|                0|             1|        0|
| 2114|female| 34|     grad|  employed|      middle|       married|     asia|      0|      1|         1|                1|             1|        0|
| 6582|female| 34|     grad|  employed|      middle|       married|  oceania|      0|      1|         3|                1|             0|        0|
|11379|female| 34|     grad|  employed|      middle|       married|  america|      0|      1|         3|        

# Context

In [23]:
context.show(10)

+-----+-------+------------+---------+-------+------------+------------+------------+--------+--------+-----+---------+
|   id|purpose|     go_with|  weather|   time|viettel_no_0|viettel_no_1|viettel_no_2|to_hanoi|to_other|score|direction|
+-----+-------+------------+---------+-------+------------+------------+------------+--------+--------+-----+---------+
|11156| Travel|    Al?!%one|&&!!Sunny|  13:00|           1|           0|           0|       0|       1|    0|        0|
| 4297| Travel|F!ri?end%(s)|  Sun!~ny|9  AM  |           1|           1|           0|       0|       1|    0|        0|
|13301| Travel|  Frie~nd(s)|!&~S!unny|  15:00|           1|           1|           0|       0|       1|    0|        0|
| 9920| Travel|  ?Friend(s)|  Su%n&ny| 2 PM  |           1|           1|           0|       0|       1|   30|        0|
| 8424| Travel|   ?Fa! mily|   Sun%ny|   10AM|           1|           1|           0|       0|       1|    0|        0|
| 6771| Travel|    Fami~!ly|  !%Sunny|  

## Go_with and weather

In [24]:
def standard_str(x):
    return str(re.sub(r"[^a-zA-Z\s]", "", str(x)).replace(' ','').lower())
udf_standard_str = udf(standard_str,StringType())

In [25]:
context =  context.withColumn('go_with', udf_standard_str(col('go_with')))
context =  context.withColumn('weather', udf_standard_str(col('weather')))

context_test =  context_test.withColumn('go_with', udf_standard_str(col('go_with')))
context_test =  context_test.withColumn('weather', udf_standard_str(col('weather')))

## Purpose

In [26]:
context = context.withColumn('purpose', when(col('purpose') == 'Work','work').otherwise('travel'))
context_test = context_test.withColumn('purpose', when(col('purpose') == 'Work','work').otherwise('travel'))

In [27]:
context.show()

+-----+-------+-------+-------+-------+------------+------------+------------+--------+--------+-----+---------+
|   id|purpose|go_with|weather|   time|viettel_no_0|viettel_no_1|viettel_no_2|to_hanoi|to_other|score|direction|
+-----+-------+-------+-------+-------+------------+------------+------------+--------+--------+-----+---------+
|11156| travel|  alone|  sunny|  13:00|           1|           0|           0|       0|       1|    0|        0|
| 4297| travel|friends|  sunny|9  AM  |           1|           1|           0|       0|       1|    0|        0|
|13301| travel|friends|  sunny|  15:00|           1|           1|           0|       0|       1|    0|        0|
| 9920| travel|friends|  sunny| 2 PM  |           1|           1|           0|       0|       1|   30|        0|
| 8424| travel| family|  sunny|   10AM|           1|           1|           0|       0|       1|    0|        0|
| 6771| travel| family|  sunny|  8  AM|           1|           1|           0|       0|       1|

## Time

In [28]:
def convert_time(x):
    if 'AM' in x:
        x = x.replace('AM','')
    elif 'PM' in x:
        x = x.replace('PM','')
        x = int(x.split(':')[0]) + 12
    elif ':00' in x:
        x = x.replace(':00','')
    return x

def time_of_day(x):
    hour = int(convert_time(x))
    if 5 <= hour < 12:
        return 'morning'
    elif 12 <= hour < 17:
        return 'afternoon'
    elif 17 <= hour < 21:
        return 'evening'
    else:
        return 'night'
    
udf_time = udf(time_of_day,StringType())

In [29]:
context =  context.withColumn('time', udf_time(col('time')))
context_test =  context_test.withColumn('time', udf_time(col('time')))

In [30]:
context.show()

+-----+-------+-------+-------+---------+------------+------------+------------+--------+--------+-----+---------+
|   id|purpose|go_with|weather|     time|viettel_no_0|viettel_no_1|viettel_no_2|to_hanoi|to_other|score|direction|
+-----+-------+-------+-------+---------+------------+------------+------------+--------+--------+-----+---------+
|11156| travel|  alone|  sunny|afternoon|           1|           0|           0|       0|       1|    0|        0|
| 4297| travel|friends|  sunny|  morning|           1|           1|           0|       0|       1|    0|        0|
|13301| travel|friends|  sunny|afternoon|           1|           1|           0|       0|       1|    0|        0|
| 9920| travel|friends|  sunny|afternoon|           1|           1|           0|       0|       1|   30|        0|
| 8424| travel| family|  sunny|  morning|           1|           1|           0|       0|       1|    0|        0|
| 6771| travel| family|  sunny|  morning|           1|           1|           0|

In [31]:
context = context.drop('to_hanoi','to_other')
context_test = context_test.drop('to_hanoi','to_other')

In [32]:
context.show()

+-----+-------+-------+-------+---------+------------+------------+------------+-----+---------+
|   id|purpose|go_with|weather|     time|viettel_no_0|viettel_no_1|viettel_no_2|score|direction|
+-----+-------+-------+-------+---------+------------+------------+------------+-----+---------+
|11156| travel|  alone|  sunny|afternoon|           1|           0|           0|    0|        0|
| 4297| travel|friends|  sunny|  morning|           1|           1|           0|    0|        0|
|13301| travel|friends|  sunny|afternoon|           1|           1|           0|    0|        0|
| 9920| travel|friends|  sunny|afternoon|           1|           1|           0|   30|        0|
| 8424| travel| family|  sunny|  morning|           1|           1|           0|    0|        0|
| 6771| travel| family|  sunny|  morning|           1|           1|           0|    0|        0|
|15308| travel| family|  sunny|afternoon|           1|           0|           0|   29|        0|
|14139| travel| family|  sunny

## Mobile Plan Attributes

In [33]:
mpa.show(10)

+---------------+--------------------+------+--------+
|    mobile_plan|         description| price|duration|
+---------------+--------------------+------+--------+
|     DATASILVER|2GB/ day, high speed|100000|     5.0|
|       DATAGOLD|5GB/ day, high speed|200000|     5.0|
|    SOCIALMEDIA|1GB/ dayUnlimited...|150000|     3.0|
|SOCIALMEDIAGOLD|3GB/ day, high sp...|250000|     3.0|
|       DATACALL|2GB/ day, high sp...|200000|     5.0|
+---------------+--------------------+------+--------+



### Mobile_plan

In [34]:
mpa = mpa.withColumn('mobile_plan', lower(col('mobile_plan')))
mpa_test = mpa_test.withColumn('mobile_plan', lower(col('mobile_plan')))

### Duration

In [35]:
mpa = mpa.withColumn('duration', regexp_replace(col('duration'), 'd', ''))
mpa_test = mpa_test.withColumn('duration', regexp_replace(col('duration'), 'd', ''))


###  Data Capacity

In [36]:
mpa = mpa.withColumn('capacity', regexp_extract(col('description'), r'(\d+)GB', 1))
mpa_test = mpa_test.withColumn('capacity', regexp_extract(col('description'), r'(\d+)GB', 1))

### Descripton

In [37]:
def extract_ad(x):
    temp = []
    if 'high speed' in x.lower():
        temp.append('High_Speed')
    if 'unlimited' in x.lower():
        temp.append('Unlimited_Social')
    if 'call' in x.lower():
        temp.append('Calls')
    return ', '.join(temp) if temp else 'None'


udf_extract_ad = udf(extract_ad, StringType())


In [38]:
mpa = mpa.withColumn('description', udf_extract_ad(col('description')))
mpa_test = mpa_test.withColumn('description', udf_extract_ad(col('description')))

In [39]:
mpa.show()

+---------------+--------------------+------+--------+--------+
|    mobile_plan|         description| price|duration|capacity|
+---------------+--------------------+------+--------+--------+
|     datasilver|          High_Speed|100000|     5.0|       2|
|       datagold|          High_Speed|200000|     5.0|       5|
|    socialmedia|    Unlimited_Social|150000|     3.0|       1|
|socialmediagold|High_Speed, Unlim...|250000|     3.0|       3|
|       datacall|High_Speed, Unlim...|200000|     5.0|       2|
+---------------+--------------------+------+--------+--------+



# MPU

In [45]:
mpu.show()

+-------+---------------+------+
|     id|    mobile_plan|accept|
+-------+---------------+------+
|11156.0|     datasilver|   1.0|
| 4297.0|socialmediagold|   1.0|
|13301.0|     datasilver|   1.0|
| 9920.0|socialmediagold|   1.0|
| 8424.0|     datasilver|   1.0|
| 6771.0|socialmediagold|   0.0|
|15308.0|     datasilver|   1.0|
|14139.0|     datasilver|   1.0|
|17824.0|    socialmedia|   1.0|
|16372.0|       datacall|   1.0|
| 1573.0|       datagold|   1.0|
|11835.0|    socialmedia|   1.0|
|19253.0|       datagold|   1.0|
|10399.0|socialmediagold|   1.0|
| 3502.0|    socialmedia|   1.0|
| 3051.0|     datasilver|   0.0|
|17620.0|       datacall|   1.0|
| 9606.0|     datasilver|   1.0|
|15884.0|     datasilver|   0.0|
| 7022.0|socialmediagold|   1.0|
+-------+---------------+------+
only showing top 20 rows



In [40]:
mpu = mpu.withColumn('mobile_plan', lower(col('mobile_plan')))
mpu_test = mpu_test.withColumnRenamed('coupon_id','mobile_plan')
mpu_test = mpu_test.withColumn('mobile_plan', lower(col('mobile_plan')))


In [41]:
mpu = mpu.na.drop()
mpu_test = mpu_test.na.drop()