In [None]:
from pyspark.sql import SparkSession
import seaborn as sns
import matplotlib.pyplot as plt
import pyspark.sql.functions as func
from pyspark.sql.functions import *
from pyspark.sql import functions as F
import warnings
warnings.filterwarnings('ignore')
from pyspark import SQLContext
from pyspark.sql.window import Window
from pyspark.sql.types import FloatType

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
# sc = spark.sparkContext

# from pyspark.sql import SQLContext
# sqlContext = SQLContext(sc)

# InputPath = ["hdfs://127.0.0.1:9000//user//hadoop//data_call//data//_06//encode//part-00000-d8b8d607-97b6-415d-a081-896ae55d627b-c000.snappy.parquet"]
# df = sqlContext.read.parquet(*InputPath,mergeSchema = True)

In [None]:
sc = spark.sparkContext

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

InputPath = ["hdfs://127.0.0.1:9000//user//hadoop//data_call//data//_06//encode//part-000**-d8b8d607-97b6-415d-a081-896ae55d627b-c000.snappy.parquet",
             "hdfs://127.0.0.1:9000//user//hadoop//data_call//data//_07//encode//part-000**-14c1e4f5-44c4-49c2-ba76-5e766b50ef24-c000.snappy.parquet",
             "hdfs://127.0.0.1:9000//user//hadoop//data_call//data//_08//encode//part-000**-2df50030-400e-495a-aece-d9f7553c8601-c000.snappy.parquet" ]
df = sqlContext.read.parquet(*InputPath,mergeSchema = True)

In [None]:
df = df.withColumn('long' , when((col('long') < 0),None).otherwise(col('long')))
df = df.withColumn('long' , when((col('long') >= 43200),None).otherwise(col('long')))
df = df.withColumn('location' , when((col('location') == 0),None).otherwise(col('location')))

In [None]:
df = df.filter(df.user != df.destination)

In [None]:
df = df.select('user', 'time', 'long', 'destination', '_type', 'location', 'target',month('time').alias('month'),dayofmonth(col('time')).alias('dayofmonth'),dayofweek(col('time')).alias('dayofweek'),hour(col('time')).alias('hour'),minute(col('time')).alias('minute'),second(col('time')).alias('second'))
df = df.select('user', 'time', 'long', 'destination', '_type', 'location', 'target',month('time').alias('month'),dayofmonth(col('time')).alias('dayofmonth'),dayofweek(col('time')).alias('dayofweek'),hour(col('time')).alias('hour'),concat(df.hour,lit(","),df.minute,lit(","),df.second).alias('List_time'))

In [None]:
df.show()

# Create Feature

## Phone call

In [None]:
#total_call,call_duration,call_duration_mean,call_duration_std,call_duration_median
call_total = df.groupBy('target').agg(count("long").alias("total_call"),sum('long').alias('call_duration'),mean('long').alias("call_duration_mean"),stddev('long').alias('call_duration_std'),percentile_approx('long',0.5).alias("call_duration_median"))
df = df.join(call_total,on=['target'], how = 'left_outer')
df = df.withColumn("call_duration_mean" , func.round(df['call_duration_mean'],1)).withColumn("call_duration_std" , func.round(df['call_duration_std'],1))

In [None]:
#total_call_1vs3m
call_1month = df.filter(df.month == '6').groupBy('target').agg(count('long').alias('total_call_1m'))
df = df.join(call_1month,on=['target'], how = 'left_outer')
df = df.withColumn('total_call_1vs3m' , (3*df['total_call_1m']) / df['total_call'])

In [None]:
#call_in_out_ratio
df = df.withColumn('call_type' , lit('call_out'))
df = df.withColumn('call_type' , F.when((F.col('_type') == 'in'),regexp_replace(df.call_type, 'call_out', 'call_in')).otherwise(df.call_type))
df_type = df.groupBy(['target']).pivot('call_type').agg(count('call_type'))
df_type = df_type.na.fill(value = 0)
df = df.join(df_type,on=['target'], how = 'left_outer')
df = df.withColumn('call_in_out_ratio' , df['call_out']/df['total_call'] )
df = df.withColumn('call_in_out_ratio',func.round(df['call_in_out_ratio'],1))

In [None]:
#Fequent_contact_count
df_frequent_out = df.filter(df.call_type == 'call_out').groupBy(['target','destination']).agg(count('long').alias('total_call_per_1'),sum('long').alias('call_duration_per_1'))
df_frequent_out = df_frequent_out.filter((df_frequent_out.total_call_per_1 >= 5) | (df_frequent_out.call_duration_per_1 >= 15)).groupBy('target').agg(count('destination').alias('frequent_out_count'))

df_frequent_in = df.filter(df.call_type == 'call_in').groupBy(['target','destination']).agg(count('long').alias('total_call_per_1'),sum('long').alias('call_duration_per_1')) 
df_frequent_in = df_frequent_in.filter((df_frequent_in.total_call_per_1 >= 5) | (df_frequent_in.call_duration_per_1 >= 15)).groupBy('target').agg(count('destination').alias('frequent_in_count'))

df = df.join(df_frequent_out,on=['target'], how = 'left_outer').join(df_frequent_in,on=['target'], how = 'left_outer')
df = df.na.fill(value= 0)
df = df.withColumn('Frequent_contact_count' , df['frequent_out_count'] + df['frequent_in_count'] )

In [None]:
#call_in_out_ratio_1vs3m_diff
df_type_2 = df.filter(df.month == '6').groupBy(['target']).pivot('call_type').agg(count('call_type'))
df_type_2 = df_type_2.na.fill(value = 0)
df_type_2 = df_type_2.withColumnRenamed("call_in" , "call_in_1m").withColumnRenamed("call_out","call_out_1m")
df = df.join(df_type_2,on=['target'], how = 'left_outer')
df = df.withColumn('call_in_out_ratio_1m' , df['call_out_1m']/df['total_call_1m'] )
df = df.withColumn('call_in_out_ratio_1m',func.round(df['call_in_out_ratio_1m'],1))
df = df.withColumn('call_in_out_ratio_1vs3m_diff' , df['call_in_out_ratio_1m'] - df['call_in_out_ratio'])

In [None]:
#call_duration_1vs3m
duration_1month = df.filter(df.month == '6').groupBy('target').agg(sum('long').alias('call_duration_1m'))
df = df.join(duration_1month,on=['target'], how = 'left_outer')
df = df.withColumn('call_duration_1vs3m' , (3*df['call_duration_1m']) / df['call_duration'])

In [None]:
#call_duration_in_out_ratio
df_duration = df.groupBy(['target']).pivot('call_type').agg(sum('long'))
df_duration = df_duration.na.fill(value = 0)
df_duration = df_duration.withColumnRenamed("call_in" , "call_in_duration").withColumnRenamed("call_out","call_out_duration")
df = df.join(df_duration,on=['target'], how = 'left_outer')
df = df.withColumn('call_duration_in_out_ratio' , df['call_out_duration']/df['call_duration'] )
df = df.withColumn('call_duration_in_out_ratio',func.round(df['call_duration_in_out_ratio'],1))


In [None]:
#call_duration_in_out_ratio_1m_1vs3m_diff
df_duration_2 = df.filter(df.month == '6').groupBy(['target']).pivot('call_type').agg(sum('long'))
df_duration_2 = df_duration_2.na.fill(value = 0)
df_duration_2 = df_duration_2.withColumnRenamed("call_in" , "call_in_duration_1m").withColumnRenamed("call_out","call_out_duration_1m")
df = df.join(df_duration_2,on=['target'], how = 'left_outer')
df = df.withColumn('call_duration_in_out_ratio_1m' , df['call_out_duration_1m']/df['call_duration_1m'] )
df = df.withColumn('call_duration_in_out_ratio_1m',func.round(df['call_duration_in_out_ratio_1m'],1))
df = df.withColumn('call_duration_in_out_ratio_1m_1vs3m_diff' , df['call_duration_in_out_ratio_1m'] - df['call_duration_in_out_ratio'])

In [None]:
df_call_out = df.filter(df.call_type == 'call_out').groupBy(['target','destination']).count().orderBy(col('target').asc(),col('count').desc())
# Tạo cửa sổ với mỗi nhóm target
window = Window.partitionBy('target').orderBy(col('count').desc())

# Sử dụng row_number để đánh số thứ tự các giá trị count trong mỗi nhóm
df_ranked_call_out = df_call_out.select('*', row_number().over(window).alias('rank'))

df_call_out_one = df_ranked_call_out.filter(df_ranked_call_out.rank == 1).select('target','count').withColumnRenamed('count','call_out_top1')

df_call_out_third = df_ranked_call_out.filter(df_ranked_call_out.rank == 3).select('target','count').withColumnRenamed('count','call_out_top3')

df_call_out_five = df_ranked_call_out.filter(df_ranked_call_out.rank == 5).select('target','count').withColumnRenamed('count','call_out_top5')

df = df.join(df_call_out_one,on=['target'], how = 'left_outer').join(df_call_out_third,on=['target'], how = 'left_outer').join(df_call_out_five,on=['target'], how = 'left_outer')

df = df.na.fill(value = 0)
df = df.withColumn('call_out_top1_contact_rate' , df['call_out_top1'] / df['call_out'])
df = df.withColumn('call_out_top3_contact_rate' , df['call_out_top3'] / df['call_out'])
df = df.withColumn('call_out_top5_contact_rate' , df['call_out_top5'] / df['call_out'])

In [None]:
df_call_in = df.filter(df.call_type == 'call_in').groupBy(['target','user']).count().orderBy(col('target').asc(),col('count').desc())

# Sử dụng row_number để đánh số thứ tự các giá trị count trong mỗi nhóm
df_ranked_call_in = df_call_out.select('*', row_number().over(window).alias('rank'))

df_call_in_one = df_ranked_call_in.filter(df_ranked_call_in.rank == 1).select('target','count').withColumnRenamed('count','call_in_top1')

df_call_in_third = df_ranked_call_in.filter(df_ranked_call_in.rank == 3).select('target','count').withColumnRenamed('count','call_in_top3')

df_call_in_five = df_ranked_call_in.filter(df_ranked_call_in.rank == 5).select('target','count').withColumnRenamed('count','call_in_top5')

df = df.join(df_call_in_one,on=['target'], how = 'left_outer').join(df_call_in_third,on=['target'], how = 'left_outer').join(df_call_in_five,on=['target'], how = 'left_outer')
df = df.na.fill(value = 0)
df = df.withColumn('call_in_top1_contact_rate' , df['call_in_top1'] / df['call_in'])
df = df.withColumn('call_in_top3_contact_rate' , df['call_in_top3'] / df['call_in'])
df = df.withColumn('call_in_top5_contact_rate' , df['call_in_top5'] / df['call_in'])

In [None]:
df_call_out_1month = df.filter((df.call_type == 'call_out') & (df.month == '6')).groupBy(['target','destination']).count().orderBy(col('target').asc(),col('count').desc())
# Sử dụng row_number để đánh số thứ tự các giá trị count trong mỗi nhóm
df_ranked_call_out_1m = df_call_out_1month.select('*', row_number().over(window).alias('rank'))

df_call_out_one_1m = df_ranked_call_out_1m.filter(df_ranked_call_out_1m.rank == 1).select('target','count').withColumnRenamed('count','call_out_top1_1m')

df_call_out_third_1m = df_ranked_call_out_1m.filter(df_ranked_call_out_1m.rank == 3).select('target','count').withColumnRenamed('count','call_out_top3_1m')

df_call_out_five_1m = df_ranked_call_out_1m.filter(df_ranked_call_out_1m.rank == 5).select('target','count').withColumnRenamed('count','call_out_top5_1m')

df = df.join(df_call_out_one_1m,on=['target'], how = 'left_outer').join(df_call_out_third_1m,on=['target'], how = 'left_outer').join(df_call_out_five_1m,on=['target'], how = 'left_outer')
df = df.na.fill(value = 0)
df = df.withColumn('call_out_top1_contact_rate_1m' , df['call_out_top1_1m'] / df['call_out_1m'])
df = df.withColumn("call_out_top1_contact_rate_1m_1vs3m_diff",df['call_out_top1_contact_rate_1m'] - df['call_out_top1_contact_rate'])
df = df.withColumn('call_out_top3_contact_rate_1m' , df['call_out_top3_1m'] / df['call_out_1m'])
df = df.withColumn("call_out_top3_contact_rate_1m_1vs3m_diff",df['call_out_top3_contact_rate_1m'] - df['call_out_top3_contact_rate'])
df = df.withColumn('call_out_top5_contact_rate_1m' , df['call_out_top5_1m'] / df['call_out_1m'])
df = df.withColumn("call_out_top5_contact_rate_1m_1vs3m_diff",df['call_out_top5_contact_rate_1m'] - df['call_out_top5_contact_rate'])

In [None]:
df_call_in_1month = df.filter((df.call_type == 'call_in') & (df.month == '6')).groupBy(['target','user']).count().orderBy(col('target').asc(),col('count').desc())

# Sử dụng row_number để đánh số thứ tự các giá trị count trong mỗi nhóm
df_ranked_call_in_1m = df_call_in_1month.select('*', row_number().over(window).alias('rank'))

df_call_in_one_1m = df_ranked_call_in_1m.filter(df_ranked_call_in_1m.rank == 1).select('target','count').withColumnRenamed('count','call_out_top1_1m')

df_call_in_third_1m = df_ranked_call_in_1m.filter(df_ranked_call_in_1m.rank == 3).select('target','count').withColumnRenamed('count','call_out_top3_1m')

df_call_in_five_1m = df_ranked_call_in_1m.filter(df_ranked_call_in_1m.rank == 5).select('target','count').withColumnRenamed('count','call_out_top5_1m')

df = df.join(df_call_in_one_1m,on=['target'], how = 'left_outer').join(df_call_in_third_1m,on=['target'], how = 'left_outer').join(df_call_in_five_1m,on=['target'], how = 'left_outer')
df = df.na.fill(value = 0)
df = df.withColumn('call_in_top1_contact_rate_1m' , df['call_in_top1_1m'] / df['call_in_1m'])
df = df.withColumn("call_in_top1_contact_rate_1m_1vs3m_diff",df['call_in_top1_contact_rate_1m'] - df['call_in_top1_contact_rate'])
df = df.withColumn('call_in_top3_contact_rate_1m' , df['call_in_top3_1m'] / df['call_in_1m'])
df = df.withColumn("call_in_top3_contact_rate_1m_1vs3m_diff",df['call_in_top3_contact_rate_1m'] - df['call_in_top3_contact_rate'])
df = df.withColumn('call_in_top5_contact_rate_1m' , df['call_in_top5_1m'] / df['call_in_1m'])
df = df.withColumn("call_in_top5_contact_rate_1m_1vs3m_diff",df['call_in_top5_contact_rate_1m'] - df['call_in_top5_contact_rate'])

In [None]:
df_call_out_duration = df.filter(df.call_type == 'call_out').groupBy(['target','destination']).agg(sum('long').alias('call_duration')).orderBy(col('target').asc(),col('call_duration').desc())

# Sử dụng row_number để đánh số thứ tự các giá trị count trong mỗi nhóm
df_ranked_call_out_duration = df_call_out_duration.select('*', row_number().over(window).alias('rank'))

df_call_out_duration_one = df_ranked_call_out_duration.filter(df_ranked_call_out_duration.rank == 1).select('target','call_duration').withColumnRenamed('call_duration','call_out_duration_top1')

df_call_out_duration_third = df_ranked_call_out_duration.filter(df_ranked_call_out_duration.rank == 3).select('target','call_duration').withColumnRenamed('call_duration','call_out_duration_top3')

df_call_out_duration_five = df_ranked_call_out_duration.filter(df_ranked_call_out_duration.rank == 5).select('target','call_duration').withColumnRenamed('call_duration','call_out_duration_top5')

df = df.join(df_call_out_duration_one,on=['target'], how = 'left_outer').join(df_call_out_duration_third,on=['target'], how = 'left_outer').join(df_call_out_duration_five,on=['target'], how = 'left_outer')

df = df.na.fill(value = 0)
df = df.withColumn('call_out_top1_contact_duration_rate' , df['call_out_duration_top1'] / df['call_out_duration'])
df = df.withColumn('call_out_top3_contact_duration_rate' , df['call_out_duration_top3'] / df['call_out_duration'])
df = df.withColumn('call_out_top5_contact_duration_rate' , df['call_out_duration_top5'] / df['call_out_duration'])

In [None]:
df_call_out_duration_1month = df.filter((df.call_type == 'call_out') & (df.month == '6')).groupBy(['target','destination']).agg(sum('long').alias('call_duration')).orderBy(col('target').asc(),col('call_duration').desc())
# Sử dụng row_number để đánh số thứ tự các giá trị count trong mỗi nhóm
df_ranked_call_out_duration_1m = df_call_out_duration_1month.select('*', row_number().over(window).alias('rank'))

df_call_out_duration_one_1m = df_ranked_call_out_duration_1m.filter(df_ranked_call_out_duration_1m.rank == 1).select('target','call_duration').withColumnRenamed('call_duration','call_out_duration_top1_1m')

df_call_out_duration_third_1m = df_ranked_call_out_duration_1m.filter(df_ranked_call_out_duration_1m.rank == 3).select('target','call_duration').withColumnRenamed('call_duration','call_out_duration_top3_1m')

df_call_out_duration_five_1m = df_ranked_call_out_duration_1m.filter(df_ranked_call_out_duration_1m.rank == 5).select('target','call_duration').withColumnRenamed('call_duration','call_out_duration_top5_1m')

df = df.join(df_call_out_duration_one_1m,on=['target'], how = 'left_outer').join(df_call_out_duration_third_1m,on=['target'], how = 'left_outer').join(df_call_out_duration_five_1m,on=['target'], how = 'left_outer')
df = df.na.fill(value = 0)
df = df.withColumn('call_out_top1_contact_duration_rate_1m' , df['call_out_duration_top1_1m'] / df['call_out_duration_1m'])
df = df.withColumn("call_out_top1_contact_duration_rate_1m_1vs3m_diff",df['call_out_top1_contact_duration_rate_1m'] - df['call_out_top1_contact_duration_rate'])
df = df.withColumn('call_out_top3_contact_duration_rate_1m' , df['call_out_duration_top3_1m'] / df['call_out_duration_1m'])
df = df.withColumn("call_out_top3_contact_duration_rate_1m_1vs3m_diff",df['call_out_top3_contact_duration_rate_1m'] - df['call_out_top3_contact_duration_rate'])
df = df.withColumn('call_out_top5_contact_duration_rate_1m' , df['call_out_duration_top5_1m'] / df['call_out_duration_1m'])
df = df.withColumn("call_out_top5_contact_duration_rate_1m_1vs3m_diff",df['call_out_top5_contact_duration_rate_1m'] - df['call_out_top5_contact_duration_rate'])

In [None]:
df_call_in_duration = df.filter(df.call_type == 'call_in').groupBy(['target','destination']).agg(sum('long').alias('call_duration')).orderBy(col('target').asc(),col('call_duration').desc())

# Sử dụng row_number để đánh số thứ tự các giá trị count trong mỗi nhóm
df_ranked_call_in_duration = df_call_in_duration.select('*', row_number().over(window).alias('rank'))

df_call_in_duration_one = df_ranked_call_in_duration.filter(df_ranked_call_in_duration.rank == 1).select('target','call_duration').withColumnRenamed('call_duration','call_in_duration_top1')

df_call_in_duration_third = df_ranked_call_in_duration.filter(df_ranked_call_in_duration.rank == 3).select('target','call_duration').withColumnRenamed('call_duration','call_in_duration_top3')

df_call_in_duration_five = df_ranked_call_in_duration.filter(df_ranked_call_in_duration.rank == 5).select('target','call_duration').withColumnRenamed('call_duration','call_in_duration_top5')

df = df.join(df_call_in_duration_one,on=['target'], how = 'left_outer').join(df_call_in_duration_third,on=['target'], how = 'left_outer').join(df_call_in_duration_five,on=['target'], how = 'left_outer')

df = df.na.fill(value = 0)
df = df.withColumn('call_in_top1_contact_duration_rate' , df['call_in_duration_top1'] / df['call_in_duration'])
df = df.withColumn('call_in_top3_contact_duration_rate' , df['call_in_duration_top3'] / df['call_in_duration'])
df = df.withColumn('call_in_top5_contact_duration_rate' , df['call_in_duration_top5'] / df['call_in_duration'])

In [None]:
df_call_in_duration_1month = df.filter((df.call_type == 'call_in') & (df.month == '6')).groupBy(['target','user']).agg(sum('long').alias('call_duration')).orderBy(col('target').asc(),col('call_duration').desc())
# Sử dụng row_number để đánh số thứ tự các giá trị count trong mỗi nhóm
df_ranked_call_in_duration_1m = df_call_in_duration_1month.select('*', row_number().over(window).alias('rank'))

df_call_in_duration_one_1m = df_ranked_call_in_duration_1m.filter(df_ranked_call_in_duration_1m.rank == 1).select('target','call_duration').withColumnRenamed('call_duration','call_in_duration_top1_1m')

df_call_in_duration_third_1m = df_ranked_call_in_duration_1m.filter(df_ranked_call_in_duration_1m.rank == 3).select('target','call_duration').withColumnRenamed('call_duration','call_in_duration_top3_1m')

df_call_in_duration_five_1m = df_ranked_call_in_duration_1m.filter(df_ranked_call_in_duration_1m.rank == 5).select('target','call_duration').withColumnRenamed('call_duration','call_in_duration_top5_1m')

df = df.join(df_call_in_duration_one_1m,on=['target'], how = 'left_outer').join(df_call_in_duration_third_1m,on=['target'], how = 'left_outer').join(df_call_in_duration_five_1m,on=['target'], how = 'left_outer')
df = df.na.fill(value = 0)
df = df.withColumn('call_in_top1_contact_duration_rate_1m' , df['call_in_duration_top1_1m'] / df['call_in_duration_1m'])
df = df.withColumn("call_in_top1_contact_duration_rate_1m_1vs3m_diff",df['call_in_top1_contact_duration_rate_1m'] - df['call_in_top1_contact_duration_rate'])
df = df.withColumn('call_in_top3_contact_duration_rate_1m' , df['call_in_duration_top3_1m'] / df['call_in_duration_1m'])
df = df.withColumn("call_in_top3_contact_duration_rate_1m_1vs3m_diff",df['call_in_top3_contact_duration_rate_1m'] - df['call_in_top3_contact_duration_rate'])
df = df.withColumn('call_in_top5_contact_duration_rate_1m' , df['call_in_duration_top5_1m'] / df['call_in_duration_1m'])
df = df.withColumn("call_in_top5_contact_duration_rate_1m_1vs3m_diff",df['call_in_top5_contact_duration_rate_1m'] - df['call_in_top5_contact_duration_rate'])

## Active Time

In [None]:
df = df.withColumn('time_call' , lit('night'))
df = df.withColumn('time_call', 
F.when((F.col('hour') >= 6 )  & (F.col('hour') <= 11),regexp_replace(df.time_call,'night','morning')) 
.when((F.col('hour') > 11 )  & (F.col('hour') < 17),regexp_replace(df.time_call,'night','afternoon'))
.when((F.col('hour') >= 18 )  & (F.col('hour') <= 23),regexp_replace(df.time_call,'night','evening')).otherwise(df.time_call))
df_type_call = df.groupBy(['target']).pivot('time_call').agg(count('time_call'))
df_type_call = df_type_call.fillna(0)

df = df.join(df_type_call,on=['target'], how = 'left_outer')
df = df.withColumn("call_morning_rate" ,df['morning'] / df['total_call']).withColumn("call_afternoon_rate" ,df['afternoon'] / df['total_call']).withColumn("call_night_rate" ,df['night'] / df['total_call']).withColumn("call_evening_rate" ,df['evening'] / df['total_call'])
df = df.withColumn("call_morning_rate" , func.round(df['call_morning_rate'],3))
df = df.withColumn("call_afternoon_rate" , func.round(df['call_afternoon_rate'],3))
df = df.withColumn("call_evening_rate" , func.round(df['call_evening_rate'],3))
df = df.withColumn("call_night_rate" , func.round(df['call_night_rate'],3))

In [None]:
df_type_call_1 = df.filter(df.month == '6').groupBy(['target']).pivot('time_call').agg(count('time_call'))
df_type_call_1 = df_type_call_1.fillna(0)
df_type_call_1 = df_type_call_1.withColumnRenamed('afternoon' , 'afternoon_1m').withColumnRenamed('evening' , 'evening_1m').withColumnRenamed('morning' , 'morning_1m').withColumnRenamed('night' , 'night_1m')

df = df.join(df_type_call_1,on=['target'], how = 'left_outer')
df = df.withColumn("call_morning_rate_1m" ,df['morning_1m'] / df['total_call_1m']).withColumn("call_afternoon_rate_1m" ,df['afternoon_1m'] / df['total_call_1m']).withColumn("call_night_rate_1m" ,df['night_1m'] / df['total_call_1m']).withColumn("call_evening_rate_1m" ,df['evening_1m'] / df['total_call_1m'])
df = df.withColumn("call_morning_rate_1m" , func.round(df['call_morning_rate_1m'],3))
df = df.withColumn("call_afternoon_rate_1m" , func.round(df['call_afternoon_rate_1m'],3))
df = df.withColumn("call_evening_rate_1m" , func.round(df['call_evening_rate_1m'],3))
df = df.withColumn("call_night_rate_1m" , func.round(df['call_night_rate_1m'],3))

df = df.withColumn('call_night_rate_1vs3m_diff' , df['call_night_rate_1m'] - df['call_night_rate'] + 1)
df = df.withColumn('call_morning_rate_1vs3m_diff' , df['call_morning_rate_1m'] - df['call_morning_rate'] + 1)
df = df.withColumn('call_evening_rate_1vs3m_diff' , df['call_evening_rate_1m'] - df['call_evening_rate'] + 1)

In [None]:
df_active_night = df.filter(df.time_call =='night').groupBy(['target']).pivot('call_type').agg(count('call_type').alias('call_night'),sum('long').alias('duration_night')).fillna(value=0)
df_active_morning = df.filter(df.time_call =='morning').groupBy(['target']).pivot('call_type').agg(count('call_type').alias('call_morning'),sum('long').alias('duration_morning')).fillna(value=0)
df_active_afternoon = df.filter(df.time_call =='afternoon').groupBy(['target']).pivot('call_type').agg(count('call_type').alias('call_afternoon'),sum('long').alias('duration_afternoon')).fillna(value=0)
df_active_evening = df.filter(df.time_call =='evening').groupBy(['target']).pivot('call_type').agg(count('call_type').alias('call_evening'),sum('long').alias('duration_evening')).fillna(value=0)

df = df.join(df_active_night,on=['target'], how = 'left_outer').join(df_active_morning,on=['target'], how = 'left_outer').join(df_active_afternoon,on=['target'], how = 'left_outer').join(df_active_evening,on=['target'], how = 'left_outer')

df = df.withColumn('call_night_in_out_ratio' , df['call_out_call_night'] / df['night'])
df = df.withColumn('call_morning_in_out_ratio' , df['call_out_call_morning'] / df['morning'])
df = df.withColumn('call_afternoon_in_out_ratio' , df['call_out_call_afternoon'] / df['afternoon'])
df = df.withColumn('call_evening_in_out_ratio' , df['call_out_call_evening'] / df['evening'])

df = df.withColumn("call_night_in_out_ratio" , func.round(df['call_night_in_out_ratio'],3))
df = df.withColumn("call_morning_in_out_ratio" , func.round(df['call_morning_in_out_ratio'],3))
df = df.withColumn("call_afternoon_in_out_ratio" , func.round(df['call_afternoon_in_out_ratio'],3))
df = df.withColumn("call_evening_in_out_ratio" , func.round(df['call_evening_in_out_ratio'],3))

In [None]:
df = df.withColumn('call_duration_night_in_out_ratio' , df['call_out_duration_night'] / (df['call_out_duration_night'] + df['call_in_duration_night']))
df = df.withColumn('call_duration_morning_in_out_ratio' , df['call_out_duration_morning'] / (df['call_out_duration_morning'] + df['call_in_duration_morning']))
df = df.withColumn('call_duration_afternoon_in_out_ratio' , df['call_out_duration_afternoon'] / (df['call_out_duration_afternoon'] + df['call_in_duration_afternoon']))
df = df.withColumn('call_duration_evening_in_out_ratio' , df['call_out_duration_evening'] / (df['call_out_duration_evening'] + df['call_in_duration_evening']))

df = df.withColumn("call_duration_night_in_out_ratio" , func.round(df['call_duration_night_in_out_ratio'],3))
df = df.withColumn("call_duration_morning_in_out_ratio" , func.round(df['call_duration_morning_in_out_ratio'],3))
df = df.withColumn("call_duration_afternoon_in_out_ratio" , func.round(df['call_duration_afternoon_in_out_ratio'],3))
df = df.withColumn("call_duration_evening_in_out_ratio" , func.round(df['call_duration_evening_in_out_ratio'],3))

In [None]:
df_day = df.filter(df.time_call == 'night').groupBy(['target','month']).agg(count_distinct('dayofmonth').alias('night_day'))
df = df.join(df_day , ['target', 'month'], how = 'left_outer')
df = df.withColumn('active_night_call_rate' , df['night_day'] / 30)
df = df.withColumn("active_night_call_rate" , func.round(df['active_night_call_rate'],3))

In [None]:
df_active_day = df.groupBy(['target','dayofweek']).agg(sum('long').alias('count_day')).orderBy(col('target').asc(),col('count_day').desc())
df_active_day_1 = df_active_day.groupBy('target').agg(max('count_day').alias('max_day'))
df_active_day = df_active_day.join(df_active_day_1 , on = ['target'] , how = 'left_outer')
df_active_day = df_active_day.filter(df_active_day.count_day == df_active_day.max_day).select(['target','dayofweek']).withColumnRenamed('dayofweek' , 'active_day_of_week')

df = df.join(df_active_day,on = ['target'] , how = 'left_outer')

In [None]:
df = df.withColumn('active_day_bin' , lit('5'))
df = df.withColumn('active_day_bin', 
F.when((F.col('dayofmonth') >= 1 )  & (F.col('dayofmonth') <= 5),regexp_replace(df.active_day_bin,'5','0')) 
.when((F.col('dayofmonth') >= 6 )  & (F.col('dayofmonth') <= 10),regexp_replace(df.active_day_bin,'5','1'))
.when((F.col('dayofmonth') >= 11 )  & (F.col('dayofmonth') <= 15),regexp_replace(df.active_day_bin,'5','2'))
.when((F.col('dayofmonth') >= 16 )  & (F.col('dayofmonth') <= 20),regexp_replace(df.active_day_bin,'5','3')) 
.when((F.col('dayofmonth') >= 20 )  & (F.col('dayofmonth') <= 25),regexp_replace(df.active_day_bin,'5','4')).otherwise(df.active_day_bin))

df_active_month = df.groupBy(['target','active_day_bin']).agg(sum('long').alias('count_day')).orderBy(col('target').asc(),col('count_day').desc())
df_active_month_1 = df_active_month.groupBy('target').agg(max('count_day').alias('max_day'))
df_active_month = df_active_month.join(df_active_month_1 , on = ['target'] , how = 'left_outer')
df_active_month = df_active_month.filter(df_active_month.count_day == df_active_month.max_day).select(['target','active_day_bin']).withColumnRenamed('active_day_bin' , 'active_day_of_month_bin')

df = df.join(df_active_month,on = ['target'] , how = 'left_outer')

## Location

In [None]:
df = df.withColumn("location_prefix", substring(df["location"], 0, 3)).withColumn("location_prefix",when(df['location'] > 10000, substring(df['location'].cast('string'), 1, 3)).otherwise(substring(df['location'].cast('string'),0,2)))
df = df.withColumn("location_prefix", concat(col("location_prefix"), lpad(lit("0"), 2, "0")))

df_loc_location = df.groupBy(['user']).agg(count('location_prefix').alias('loc_to_district_count'))
df = df.join(df_loc_location ,on = ['user'] , how = 'left_outer')

In [None]:
df_full_location = df.filter(df.location != df.location_prefix).groupBy(['user']).agg(count('location').alias('full_location_count'))
df = df.join(df_full_location ,on = ['user'] , how = 'left_outer')
df = df.fillna(0 , subset = ['loc_to_district_count','full_location_count'])

In [None]:
df_top_full_location = df.filter(df.location != df.location_prefix).groupBy(['user','location']).count().orderBy(col('user').asc(),col('count').desc())
# Tạo cửa sổ với mỗi nhóm target
window = Window.partitionBy('user').orderBy(col('count').desc())

# Sử dụng row_number để đánh số thứ tự các giá trị count trong mỗi nhóm
df_ranked_full_location = df_top_full_location.select('*', row_number().over(window).alias('rank'))

df_top_location_one = df_ranked_full_location.filter(df_ranked_full_location.rank == 1).select('user','count').withColumnRenamed('count','full_location_top1')

df_top_location_third = df_ranked_full_location.filter(df_ranked_full_location.rank == 3).select('user','count').withColumnRenamed('count','full_location_top3')

df_top_location_five = df_ranked_full_location.filter(df_ranked_full_location.rank == 5).select('user','count').withColumnRenamed('count','full_location_top5')

df = df.join(df_top_location_one,on=['user'], how = 'left_outer').join(df_top_location_third,on=['user'], how = 'left_outer').join(df_top_location_five,on=['user'], how = 'left_outer')

df = df.na.fill(value = 0)
df = df.withColumn('full_location_top1_rate' , df['full_location_top1'] / df['full_location_count'])
df = df.withColumn('full_location_top3_rate' , df['full_location_top3'] / df['full_location_count'])
df = df.withColumn('full_location_top5_rate' , df['full_location_top5'] / df['full_location_count'])

In [None]:
df_top_loc_location = df.groupBy(['user','location_prefix']).count().orderBy(col('user').asc(),col('count').desc())
# Tạo cửa sổ với mỗi nhóm target
window = Window.partitionBy('user').orderBy(col('count').desc())

# Sử dụng row_number để đánh số thứ tự các giá trị count trong mỗi nhóm
df_ranked_loc_location = df_top_loc_location.select('*', row_number().over(window).alias('rank'))

df_top_loc_location_one = df_ranked_loc_location.filter(df_ranked_loc_location.rank == 1).select('user','count').withColumnRenamed('count','loc_location_top1')

df_top_loc_location_third = df_ranked_loc_location.filter(df_ranked_loc_location.rank == 3).select('user','count').withColumnRenamed('count','loc_location_top3')

df_top_loc_location_five = df_ranked_loc_location.filter(df_ranked_loc_location.rank == 5).select('user','count').withColumnRenamed('count','loc_location_top5')

df = df.join(df_top_loc_location_one,on=['user'], how = 'left_outer').join(df_top_loc_location_third,on=['user'], how = 'left_outer').join(df_top_loc_location_five,on=['user'], how = 'left_outer')

df = df.na.fill(value = 0)
df = df.withColumn('loc_to_dictrict_top1_rate' , df['loc_location_top1'] / df['loc_to_district_count'])
df = df.withColumn('loc_to_dictrict_top3_rate' , df['loc_location_top3'] / df['loc_to_district_count'])
df = df.withColumn('loc_to_dictrict_top5_rate' , df['loc_location_top5'] / df['loc_to_district_count'])

In [None]:
df_full_location_1m = df.filter(df.month == '6').filter(df.location != df.location_prefix).groupBy(['user']).agg(count('location').alias('full_location_count_1m'))
df = df.join(df_full_location_1m ,on = ['user'] , how = 'left_outer')
df = df.withColumn('full_location_count_1vs3m' , 3*df['full_location_count_1m'] / df['full_location_count'])

In [None]:
df_loc_location_1m = df.groupBy(['user']).agg(count('location_prefix').alias('loc_to_district_count_1m'))
df = df.join(df_loc_location_1m ,on = ['user'] , how = 'left_outer')
df = df.withColumn('loc_to_district_count_1vs3m' , 3*df['loc_to_district_count_1m'] / df['loc_to_district_count'])

In [None]:
df_lat_loc_location =  sqlContext.read.csv('location_data_with_coordinatescoordinates.csv',header=True)

In [None]:
df_lat_loc_location = df_lat_loc_location.withColumnRenamed('ward_code','location_full')
df_lat_loc_location_full = df_lat_loc_location.select('location_full','lat','long')
df_lat_loc_location_full = df_lat_loc_location_full.withColumnRenamed('lat' , 'lat_full_location').withColumnRenamed('long','long_full_location')
df = df.join(df_lat_loc_location_full , df.location == df_lat_loc_location_full.location_full, how='left')


In [None]:
df_post_code = sqlContext.read.csv('postcode.csv',header=True)

In [None]:
df = df.withColumn("province_code_location",when(df['location'] > 10000, substring(df['location'].cast('string'), 1, 2)).otherwise(substring(df['location'].cast('string'),0,1)))
df = df.withColumn("district_code_location",when(df['location'] > 10000, substring(df['location'].cast('string'), 1, 3)).otherwise(substring(df['location'].cast('string'),0,2)))

df = df.withColumn("province_code_location", concat(col("province_code_location"), lpad(lit("0"), 3, "0")))
df = df.withColumn("district_code_location", concat(col("district_code_location"), lpad(lit("0"), 2, "0")))
df_post_code = df_post_code.drop('ward','ward_code')
df = df.join(df_post_code,((df.province_code_location == df_post_code.province_code) & (df.district_code_location == df_post_code.district_code)),how = 'left' )
df = df.dropDuplicates()

df = df.withColumn('province_district', concat(col('province') , lit(' '), col('district')))

In [None]:
df_lat_loc_location_district = df_lat_loc_location.filter(df_lat_loc_location.ward.isNull())
df_lat_loc_location_district = df_lat_loc_location_district.withColumn('location_full' , concat(col('province'), lit(' '),col('district')))
df_lat_loc_location_district = df_lat_loc_location_district.withColumnRenamed('lat' , 'lat_loc_to_district').withColumnRenamed('long' , 'long_loc_to_district')
df_lat_loc_location_district = df_lat_loc_location_district.select('lat_loc_to_district','long_loc_to_district','location_full').withColumnRenamed('location_full' , 'location_district')
df = df.withColumn('province_district' , concat(col('province') , lit(' '), col('district')))
df = df.join(df_lat_loc_location_district,df.province_district == df_lat_loc_location_district.location_district,how='left')

In [None]:
df_max_active = df.groupBy(['user','location']).agg(count('location').alias('count_lo'))
df_max_active_1 = df_max_active.groupBy(['user']).agg(max('count_lo').alias('max_active_location'))
df_max_active = df_max_active.join(df_max_active_1 ,on = ['user'] , how = 'left_outer')
df_max_active = df_max_active.filter(df_max_active.count_lo == df_max_active.max_active_location).select(['user','location']).withColumnRenamed('location','max_active_location')

df_max_active = df_max_active.withColumn("province_code_location",when(df_max_active['max_active_location'] > 10000, substring(df_max_active['max_active_location'].cast('string'), 1, 2)).otherwise(substring(df_max_active['max_active_location'].cast('string'),0,1)))
df_max_active = df_max_active.withColumn("district_code_location",when(df_max_active['max_active_location'] > 10000, substring(df_max_active['max_active_location'].cast('string'), 1, 3)).otherwise(substring(df_max_active['max_active_location'].cast('string'),0,2)))

df_max_active = df_max_active.withColumn("province_code_location", concat(col("province_code_location"), lpad(lit("0"), 3, "0")))
df_max_active = df_max_active.withColumn("district_code_location", concat(col("district_code_location"), lpad(lit("0"), 2, "0")))
df_post_code = df_post_code.drop('ward','ward_code')
df_max_active = df_max_active.join(df_post_code,((df_max_active.province_code_location == df_post_code.province_code) & (df_max_active.district_code_location == df_post_code.district_code)),how = 'left_outer' )
df_max_active = df_max_active.dropDuplicates()

df_max_active = df_max_active.withColumn('active_location', concat(col('province') , lit(' '), col('district')))
df_max_active = df_max_active.select('user' , 'active_location')
df = df.join(df_max_active ,on = ['user'] , how = 'left')

df_max_active_lat_long = df.filter(df.location_district == df.active_location).select('user','lat_loc_to_district','long_loc_to_district').dropDuplicates()
df_max_active_lat_long = df_max_active_lat_long.withColumnRenamed('lat_loc_to_district','lat_active_location').withColumnRenamed('long_loc_to_district' , 'long_active_location')
df = df.join(df_max_active_lat_long ,on = ['user'] , how = 'left')

In [None]:
df_population = sqlContext.read.csv('df_population.csv',header=True)

In [None]:
df_population = df_population.withColumnRenamed('district','district_population').withColumnRenamed('province','province_population')
df_population = df_population.withColumn('active_location_population', concat(col('province_population') , lit(' '), col('district_population')))
df = df.join(df_population , df.active_location == df_population.active_location_population,how = 'left_outer')
df = df.withColumnRenamed('area','active_location_area').withColumnRenamed('population','active_location_population').withColumnRenamed('distrct_vs_province','active_location_population_DistrictVsProvince').withColumnRenamed('density','active_location_density').withColumnRenamed('loc_income_capita','active_location_income_capita').withColumnRenamed('loc_type_enc','active_location_loc_type_enc')

In [None]:
df_max_active_1m = df.filter(df.month == 6).groupBy(['user','location']).agg(count('location').alias('count_lo'))
df_max_active_1_1m = df_max_active_1m.groupBy(['user']).agg(max('count_lo').alias('max_active_location'))
df_max_active_1m = df_max_active_1m.join(df_max_active_1_1m ,on = ['user'] , how = 'left_outer')
df_max_active_1m = df_max_active_1m.filter(df_max_active_1m.count_lo == df_max_active_1m.max_active_location).select(['user','location']).withColumnRenamed('location','max_active_location')

df_max_active_1m = df_max_active_1m.withColumn("province_code_location",when(df_max_active_1m['max_active_location'] > 10000, substring(df_max_active_1m['max_active_location'].cast('string'), 1, 2)).otherwise(substring(df_max_active_1m['max_active_location'].cast('string'),0,1)))
df_max_active_1m = df_max_active_1m.withColumn("district_code_location",when(df_max_active_1m['max_active_location'] > 10000, substring(df_max_active_1m['max_active_location'].cast('string'), 1, 3)).otherwise(substring(df_max_active_1m['max_active_location'].cast('string'),0,2)))

df_max_active_1m = df_max_active_1m.withColumn("province_code_location", concat(col("province_code_location"), lpad(lit("0"), 3, "0")))
df_max_active_1m = df_max_active_1m.withColumn("district_code_location", concat(col("district_code_location"), lpad(lit("0"), 2, "0")))
df_post_code = df_post_code.drop('ward','ward_code')
df_max_active_1m = df_max_active_1m.join(df_post_code,((df_max_active_1m.province_code_location == df_post_code.province_code) & (df_max_active_1m.district_code_location == df_post_code.district_code)),how = 'left_outer' )
df_max_active_1m = df_max_active_1m.dropDuplicates()

df_max_active_1m = df_max_active_1m.withColumn('active_location_1m', concat(col('province') , lit(' '), col('district')))
df_max_active_1m = df_max_active_1m.select('user' , 'active_location_1m')
df = df.join(df_max_active_1m ,on = ['user'] , how = 'left')

df_max_active_lat_long_1m = df.filter(df.location_district == df.active_location_1m).select('user','lat_loc_to_district','long_loc_to_district').dropDuplicates()
df_max_active_lat_long_1m = df_max_active_lat_long_1m.withColumnRenamed('lat_loc_to_district','lat_active_location_1m').withColumnRenamed('long_loc_to_district' , 'long_active_location_1m')
df = df.join(df_max_active_lat_long_1m ,on = ['user'] , how = 'left')

In [None]:
df = df.withColumn("lat_active_location_1m",col('lat_active_location_1m').cast('float'))
df = df.withColumn("long_active_location_1m",col('lat_active_location_1m').cast('float'))
df = df.withColumn("lat_active_location",col('lat_active_location').cast('float'))
df = df.withColumn("long_active_location",col('lat_active_location').cast('float'))

# Tạo cột mới
df = df.withColumn("distance_activeLocation_1vs3m", sqrt((df['lat_active_location_1m'] - df['lat_active_location'])**2 + (df['long_active_location_1m'] - df['long_active_location'])**2))

# Xử lý các trường hợp đặc biệt
df = df.withColumn("distance_activeLocation_1vs3m", 
                   when(F.col('active_location_1m').isNull(), 0).when(F.col('active_location').isNull(), -1)
                   .otherwise(df.distance_activeLocation_1vs3m))

In [None]:
df_ranked_loc_location = df_ranked_loc_location.withColumnRenamed('count' , 'count_loc')
df_top_loc_location_two = df_ranked_loc_location.filter((df_ranked_loc_location.rank == 2) & (df_ranked_loc_location.count_loc >= 2) ).select('user','location_prefix').withColumnRenamed('location_prefix','loc_location_top2')
df = df.join(df_top_loc_location_two ,on = ['user'] , how = 'left')

df_top_loc_location_two_lat_long = df.filter(df.district_code == df.loc_location_top2).select('user','lat_loc_to_district','long_loc_to_district').dropDuplicates()
df_top_loc_location_two_lat_long = df_top_loc_location_two_lat_long.withColumnRenamed('lat_loc_to_district','lat_active_location_top2').withColumnRenamed('long_loc_to_district' , 'long_active_location_top2')
df = df.join(df_top_loc_location_two_lat_long ,on = ['user'] , how = 'left')

In [None]:
df = df.withColumn("lat_active_location_top2",col('lat_active_location_top2').cast('float'))
df = df.withColumn("long_active_location_top2",col('long_active_location_top2').cast('float'))

# Tạo cột mới
df = df.withColumn("distance_top1_top2_location", sqrt((df['lat_active_location'] - df['lat_active_location_top2'])**2 + (df['long_active_location'] - df['long_active_location_top2'])**2))

# # Xử lý các trường hợp đặc biệt
# df = df.withColumn("distance_top1_top2_location", 
#                    when(F.col('loc_location_top2').isNull(), 0).when(F.col('active_location').isNull(), -1)
#                    .otherwise(df.distance_activeLocation_1vs3m))

In [None]:
df_destination_max_call = df.groupBy(['user','destination']).agg(count('destination').alias('count_lo'))
df_destination_max_call_1 = df_destination_max_call.groupBy(['user']).agg(max('count_lo').alias('max_destination'))
df_destination_max_call = df_destination_max_call.join(df_destination_max_call_1 ,on = ['user'] , how = 'left_outer')
df_destination_max_call = df_destination_max_call.filter(df_destination_max_call.count_lo == df_destination_max_call.max_destination).select(['user','destination']).withColumnRenamed('destination','max_destination_call')
df_user_lat_long = df.select('user','lat_active_location','long_active_location').dropDuplicates()
df_user_lat_long = df_user_lat_long.withColumnRenamed('user' , 'user_')
df_destination_max_call = df_destination_max_call.join(df_user_lat_long ,df_destination_max_call.max_destination_call == df_user_lat_long.user_, how = 'left')
df_destination_max_call = df_destination_max_call.select('user','max_destination_call','lat_active_location','long_active_location')
df_destination_max_call = df_destination_max_call.withColumnRenamed('lat_active_location','lat_active_location_destination_callCount').withColumnRenamed('long_active_location' , 'long_active_location_destination_callCount')

df = df.join(df_destination_max_call ,on = ['user'] , how = 'left')


In [None]:
df = df.withColumn("lat_active_location_destination_callCount",col('lat_active_location_destination_callCount').cast('float'))
df = df.withColumn("long_active_location_destination_callCount",col('long_active_location_destination_callCount').cast('float'))

# Tạo cột mới
df = df.withColumn("distance_top1_contact_callCount", sqrt((df['lat_active_location'] - df['lat_active_location_destination_callCount'])**2 + (df['long_active_location'] - df['long_active_location_destination_callCount'])**2))


In [None]:
df_destination_max_duration = df.groupBy(['user','destination']).agg(sum('long').alias('sum_lo'))
df_destination_max_duration_1 = df_destination_max_duration.groupBy(['user']).agg(max('sum_lo').alias('max_duration_destination'))
df_destination_max_duration = df_destination_max_duration.join(df_destination_max_duration_1 ,on = ['user'] , how = 'left_outer')
df_destination_max_duration = df_destination_max_duration.filter(df_destination_max_duration.sum_lo == df_destination_max_duration.max_duration_destination).select(['user','destination']).withColumnRenamed('destination','max_destination_duration')
df_user_lat_long = df.select('user','lat_active_location','long_active_location').dropDuplicates()
df_user_lat_long = df_user_lat_long.withColumnRenamed('user' , 'user_')
df_destination_max_duration = df_destination_max_duration.join(df_user_lat_long ,df_destination_max_duration.max_destination_duration == df_user_lat_long.user_, how = 'left')
df_destination_max_duration = df_destination_max_duration.select('user','max_destination_duration','lat_active_location','long_active_location')
df_destination_max_duration = df_destination_max_duration.withColumnRenamed('lat_active_location','lat_active_location_destination_callDuration').withColumnRenamed('long_active_location' , 'long_active_location_destination_callDuration')

df = df.join(df_destination_max_duration ,on = ['user'] , how = 'left')

In [None]:
df = df.withColumn("lat_active_location_destination_callDuration",col('lat_active_location_destination_callDuration').cast('float'))
df = df.withColumn("long_active_location_destination_callDuration",col('long_active_location_destination_callDuration').cast('float'))

# Tạo cột mới
df = df.withColumn("distance_top1_contact_callDuration", sqrt((df['lat_active_location'] - df['lat_active_location_destination_callDuration'])**2 + (df['long_active_location'] - df['long_active_location_destination_callDuration'])**2))
