---
# Some useful stuff



In [None]:
dms_total = dms_total.repartition(150)

In [None]:
sales_distr = sales.filter('weekStart like "%2016%"')

In [None]:
# scp jupyterhub:/mapr/prdfcmaprwithspark1/production/markets/vietnam/datascience/extra_columns .
# scp file_to_upload jupyterhub:~     -- upload to your one home directory on jupyterhub
# scp file_to_upload jupyterhub:/mapr/prdfcmaprwithspark1/production/markets/vietnam/datascience/

In [None]:
#to timestamp
from pyspark.sql import Column

def to_timestamp(
       column_name: str,
       dest_col_name: str,
       ts_format: str = "yyyy-MM-dd HH:mm:ss") -> Column:
    return (unix_timestamp(col(column_name), ts_format)
           .cast(TimestampType())
           .alias(dest_col_name))


In [None]:
# weeks
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import SparkSession


def pandas_weeks_table(start_date, end_date=None):
    """
        Creates a Pandas dataframe that represents a week table
        containing the date when a week starts
        respectively the date when a weeks ends,
        starting the table from start_date and ending the table at end_date.

        :param start_date:
            Represents the start of the weeks table.
            If the start date is in the middle of a week (not a Monday)
            the table will be started from the most recent Monday.

        :param end_date:
            Represents the end of the weeks table.
            If the end date is in the middle of a week (not a Sunday) the
            table will be started from the nest Sunday.
            It the end_date is not specified then today will be used instead.

        >>> pandas_weeks_table(start_date='2018-01-02', end_date='2018-02-01')
           week_start    week_end
        0  2018-01-01  2018-01-07
        1  2018-01-08  2018-01-14
        2  2018-01-15  2018-01-21
        3  2018-01-22  2018-01-28
        4  2018-01-29  2018-02-04

        >>> pandas_weeks_table(
        ...    start_date='2018-01-02 23:34:59',
        ...    end_date='2018-02-01 21:31:51')
           week_start    week_end
        0  2018-01-01  2018-01-07
        1  2018-01-08  2018-01-14
        2  2018-01-15  2018-01-21
        3  2018-01-22  2018-01-28
        4  2018-01-29  2018-02-04
    """
    end_table = (
        (end_date and pd.Timestamp(end_date))
        or pd.Timestamp.today())
    start_table = pd.Timestamp(start_date).normalize()
    week_start = (
        pd.date_range(
            start=start_table - pd.Timedelta(6, unit='D'),
            end=end_table,
            freq='W-MON')
        .to_series()
        .dt.date
        .reset_index(drop=True)
    )

    week_end = (
        pd.date_range(
            start=start_table,
            end=end_table + pd.Timedelta(6, unit='D'),
            freq='W-SUN')
        .to_series()
        .dt.date
        .reset_index(drop=True)
    )

    table = pd.DataFrame(
        [week_start, week_end],
        ['week_start', 'week_end']
    ).transpose()

    return table


def spark_weeks_table(start_date, end_date=None):
    """
        Creates a Spark DataFrame that represents a week table
        containing the date when a week starts respectively
        the date when a weeks ends, plus the week number in the year
        and the year itself.
        The first week will be the week where start_date is,
        and the last week will be the week where end_date is.

        :param spark:
            The Spark session.

        :param start_date:
            Represents the start of the weeks table.
            If the start date is in the middle of a week (not a Monday)
            the table will be started from the most recent Monday.

        :param end_date:
            Represents the end of the weeks table.
            If the end date is in the middle of a week (not a Sunday)
            the table will be started from the nest Sunday.
            It the end_date is not specified then today will be used instead.

        >>> weeks_table = spark_weeks_table(
        ...    start_date='2015-12-30 23:34:59',
        ...    end_date='2016-01-12 21:31:51')

        >>> weeks_table.printSchema()
        root
         |-- week_start: date (nullable = true)
         |-- week_end: date (nullable = true)
         |-- week_number: integer (nullable = true)
         |-- year: integer (nullable = true)
        <BLANKLINE>

        >>> weeks_table.collect()  # doctest: +NORMALIZE_WHITESPACE
        [Row(week_start=datetime.date(2015, 12, 28),
             week_end=datetime.date(2016, 1, 3),
             week_number=53,
             year=2016),
         Row(week_start=datetime.date(2016, 1, 4),
             week_end=datetime.date(2016, 1, 10),
             week_number=1,
             year=2016),
         Row(week_start=datetime.date(2016, 1, 11),
             week_end=datetime.date(2016, 1, 17),
             week_number=2,
             year=2016)]
    """
    # since there should be a single spark session
    # this is safe to do and it will return the active session
    # useful since otherwise I would have needed to have spark as parameter
    # when the doctest are run, a spark session will be create on local
    spark = SparkSession.builder.getOrCreate()
    week_table = pandas_weeks_table(start_date, end_date)

    weeks = (
        spark
        .createDataFrame(week_table)
        .withColumn('week_number', F.weekofyear('week_end'))
        .withColumn('year', F.year('week_end')))
    return weeks

In [None]:
friso_top_prov = friso_mapped_all.where(friso_mapped_all.province.isin(top_provinces))

In [None]:
days = lambda i: i * 86400 

from pyspark.sql.window import Window

w = (Window()
   .partitionBy(col("account_id"))
   .orderBy(col("activity_date_a").cast("timestamp").cast("long"))
   .rangeBetween(-days(7), 0))

df = (df.select(col("*"), sum("n_days_last_activity").over(w).alias("sum_active_days"),
          F.min('activity_date_a').over(w).alias('min_date'),
         F.max('activity_date_a').over(w).alias('max_date')))


df = df.withColumn('days_diff',datediff(to_date(unix_timestamp('max_date','yyyy-MM-dd').cast('timestamp')),\
                                        to_date(unix_timestamp('min_date','yyyy-MM-dd').cast('timestamp'))))


df = df.withColumn('avg_days_inactivity',col('days_diff')/col('sum_active_days'))

In [None]:
#create table in which every stage column is divided by its marco polo usage mapping

sales_mp = sales_stages1
#divide every stage column
for col_stage, kg_mp, stage in zip(['StagePregnant_totalKG','Stage1_totalKG','Stage2_totalKG','Stage3_totalKG',\
                              'Stage4_totalKG','Stage5_totalKG'],\
                             [2.8, 2.9, 4.3, 6, 4.6, 3.4],\
                                   ['StagePregnant', 'Stage1', 'Stage2', 'Stage3', 'Stage4', 'Stage5']):
    sales_mp = sales_mp.withColumn('{}_mums'.format(stage), sales_mp[col_stage] / int(kg_mp))
  #replace nans with 0 in number of mums
sales_mp = sales_mp.fillna(0, subset=['StagePregnant_mums','Stage1_mums','Stage2_mums','Stage3_mums',\
                   'Stage4_mums','Stage5_mums'])


In [None]:
#pivot by stage
sales_stages1 = sales_stages.groupby('weekStart', 'weekEnd', 'district')\
                            .pivot('Type').sum('summedProductsKG')

#rename columns
for col in ['Stage1', 'Stage2', 'Stage3', 'Stage4', 'Stage5']:
    sales_stages1 = sales_stages1.withColumnRenamed(col, col+'_totalKG')
for col in ['Pregnant', 'Special']:
    sales_stages1 = sales_stages1.withColumnRenamed(col, 'Stage'+col+'_totalKG')

In [None]:
#first we need to have the date formatted without time, otherwise in the same day dates are read as different
act_df = act_df.withColumn('act_day', to_date(col('act_created')))
#group to calculate number of activities per day, per day and account
daily_group = act_df.groupby('accountId', 'act_day').agg(count(col('act_created')).alias('n_activities'))
#
today_df = spark.createDataFrame([[today,0]], ['act_day', 'n_activities']) #df with today's date

In [None]:
tmp_check = accounts_delta.withColumn('CreatedOn_X',  from_unixtime(unix_timestamp('CreatedOn','MM/dd/yyy hh:mm:ss')))

In [None]:
#for each account, calc time difference with next row

df = act_today_df
my_window = Window.partitionBy("accountId").orderBy("act_day")     #create a window in the dataframe

#add column with previous date
df = df.withColumn("prev_date", F.lag(df.act_day).over(my_window))
#difference between every date and the previous one
df = df.withColumn('date_diff', datediff(col('act_day'), col('prev_date')))
df = df.repartition(50).persist(StorageLevel.DISK_ONLY)

/media/sf_shared_folder
scp  scp Income-postcode-2004-2014.csv ec2-user@fc1:/home/ec2-user/
and vice versa
scp ec2-user@fc1:/home/ec2-user/outputNielsen.html outputNielsen.html

In [None]:
# Inner, outer etc joining
# https://pandas.pydata.org/pandas-docs/stable/merging.html

In [None]:
#First format the CREATE_DT to the right format
start_table = start_table.withColumn("formatDT",from_unixtime(unix_timestamp('CREATE_DT', 'MM/dd/yyyy hh:mm:ss')))

In [None]:
def rename_columns(df,dict_cols):
    for k,v in dict_cols.items():
        df = df.withColumnRenamed(k,v)
    return df

dict_cols = {"thong_tin_nielsen_cap_nhat_thuc_te__PHUONG/_XA":"ward",\
            "thong_tin_nielsen_cap_nhat_thuc_te__QUAN/_HUYEN":"district",\
            "thong_tin_nielsen_cap_nhat_thuc_te__THANH_PHO/TINH":"province"}

In [None]:
def columns_renamed(df,dict_cols):
    for k, v in dict_cols.items():
        df = df.withColumnRenamed(k,v)
    return df


dict_cols = {"ADDR_1":"address","ADDR_3":"ward","ADDR_4":"district","ADDR_5":"province"}
cust_address = cust_v5.select(list(dict_cols.keys()) + ["CUST_CD","DIST_CD","ADDR_2","ADDR_6"])   
#df with customer and adresses
cust_address = columns_renamed(cust_address,dict_cols)


In [None]:
#Now only get the unique districts among the not mapped ones
unique_dist = cust_address.select("district").distinct()
#print(unique_dist.count())

def remove_accents(input_str):
    #Removing the accents of the string
    return unidecode.unidecode(input_str)

func_udf = udf(lambda x: remove_accents(x),StringType())

#List of columns that needs to be upper case and removing of the accents
list_cols = ["district"]
for column in list_cols:
    unique_dist = unique_dist.withColumn("NoAccent_{}".format(column),func_udf(upper(col(column))))

In [None]:
list_mapping = dms_mapping.collect()     
#make a copy of dms_mapping containing the start column and the formatted version

to_replace = []
value_replace = []
for row in list_mapping:
    #Add the start of the mapping
    to_replace.append(row["start"])
    #Add the format results so we can later use it with the regexp replace
    value_replace.append(row["format_result"])
    

In [None]:
#added formatted version to the unique not mapped districts
#Make a new column with the format district
unique_dist = unique_dist.withColumn("format_district",col("NoAccent_district"))
#Replace the start from the above cell with the value of the format_result
unique_dist = unique_dist.replace(to_replace,value_replace,subset=["format_district"])
if verbose:
    unique_dist.show()
    unique_dist.groupby("format_district").count().sort("count", ascending = False).show()

In [None]:
#ADD COLUMN WITH TOTAL AVERAGE PER YEAR

def sum_(*cols):                        #define function so I can use a list of column names
      return reduce(add, cols, lit(0))

brand_activ = brand_activ.withColumn('total_avgyears', sum_(*[col(x) for x in brand_avg_cols] ))

In [None]:
#build a dataframe containing the accountid, and percentages of the first,
#second and third favorites (supercompact command that Emanuel made out of magic)

brand = (brand_act
 .withColumn('sorted_values', F.sort_array(F.array(*brand_perc_cols), asc=False))    #adds a column containing a list with
                                                                                    #the sorted values of the perc columns
 .select(                                                                           #makes new dataframe containing accountid
     'accountid',
     F.col('sorted_values').getItem(0).alias('max1'),                               #and the three max percentages
     F.col('sorted_values').getItem(1).alias('max2'),
     F.col('sorted_values').getItem(2).alias('max3')))



brand_nofil = (brand_act_nofil
 .withColumn('sorted_values', F.sort_array(F.array(*brand_perc_cols), asc=False))    #adds a column containing a list with
                                                                                    #the sorted values of the perc columns
 .select(                                                                           #makes new dataframe containing accountid
     'accountid',
     F.col('sorted_values').getItem(0).alias('max1'),                               #and the three max percentages
     F.col('sorted_values').getItem(1).alias('max2'),
     F.col('sorted_values').getItem(2).alias('max3')))

In [None]:
# Length of the string in a udf function
func = udf(lambda xs: len(xs), IntegerType())

#The brand with each regex expression in it
brand_regex = {"Friso":'[f|F]riso|FRISO',
               "CPL":"CPL",
               "DutchLady":"DL[^a-zA-Z]|CGHL|DUTCH LADY|DUTCH BABY",
               "Fristi":"[f|F]risti|FRISTI",
              #"Nubei":"", Could not find any information about this brand
              "Ovaltine":"[o|O]valtine|OVALTINE",
              "POSM":"POSM",
              "TS":"TS",
              "Yomost":"[Y|y]omost|YOMOST|YM"}
#Get the brand names. This is just the keys of the regex brands
brand_names = brand_regex.keys()


#Parse from the PRD_DESC,PRD_DESC1,PRD_DESC2, the regex from each brand and check 
#for all product description which is the regex. Then pick the greatest of the three product descriptions
def parse_regex(df,name,regex):
    df = df.withColumn("{}_desc0".format(name),func(regexp_extract(col("PRD_DESC"),regex,0)))
    df = df.withColumn("{}_desc1".format(name),func(regexp_extract(col("PRD_DESC1"),regex,0)))
    df = df.withColumn("{}_desc2".format(name),func(regexp_extract(col("PRD_DESC2"),regex,0)))
    df = df.withColumn("{}_all".format(name),greatest("{}_desc0".format(name),"{}_desc1".format(name),\
                                                      "{}_desc2".format(name)))\
           .drop("{}_desc0".format(name)).drop("{}_desc1".format(name)).drop("{}_desc2".format(name))
    return df

#Do it for all the brands
for name in brand_names:
    prd_CD = parse_regex(prd_CD,name,brand_regex[name])

#Parse the brand from the above maded columns with all. If this is bigger then 0, then it should have a regex that is
#bigger then 0, so it should have that brand at that point else it will return unknown
def parse_brand_col(df):
    if df.Yomost_all > 0:
        return "Yomost"
    elif df.Fristi_all > 0:
        return "Fristi"
    elif df.Ovaltine_all > 0:
        return "Ovaltine"
    elif df.DutchLady_all > 0:
        return "DutchLady"
    elif df.POSM_all > 0:
        return "POSM"
    elif df.Friso_all > 0:
        return "Friso"
    elif df.TS_all > 0:
        return "TS"
    else:
        return "Unknown"
    
parse_brand = udf(parse_brand_col,StringType())

prd_CD = prd_CD.withColumn("Brand", parse_brand(struct(*prd_CD.columns)))
#Clean up the dataframe.
for name in brand_names:
    prd_CD = prd_CD.drop("{}_all".format(name))

prd_CD.show()
prd_CD.printSchema()
prd_CD.cache()

In [None]:
not_friso = prd_CD
print(not_friso.count())

#Parse the fully product size. This should be something like 30X400gr or 10X250ML
not_friso = not_friso.withColumn("product_size_full",regexp_extract(col("PRD_DESC1"),'((\d+[X|x])?(\d+.\d+))(ML|ml|g|K?G)r?', 1))
#Parse the product size. This should be 400 or 250
not_friso = not_friso.withColumn("product_size",regexp_extract(col("product_size_full"),'[X|x](\d+|\d+.\d+)', 1))
#This should be gr or ML
not_friso = not_friso.withColumn("product_dimension",regexp_extract(col("PRD_DESC1"),'((\d+[X|x])?(\d+.\d+))(ML|ml|g|K?G)r?', 4))
#Show everything
not_friso.select("PRD_DESC1","product_size_full","product_size","product_dimension").show(10,False)

In [None]:
PRD_DESC1                        |product_size_full|product_size|product_dimension|
+---------------------------------+-----------------+------------+-----------------+
|UHT DL STRAWBERRY 48X110ml       |48X110           |110         |ml               |
|UHT FRISTI CHOCOLATE 48X110ml    |48X110           |110         |ml               |
|UHT FRISTI STRAWBERRY 48X110ml   |48X110           |110         |ml               |
|DKY YOMOST ORANGE CB 48X180ml    |48X180           |180         |ml               |
|DKY YOMOST STRAWBERRY CB 48X180ml|48X180           |180         |ml               |
|IMP DL VANILLA 12X900g           |12X900           |900         |g                |
|IMP DL VANILLA 24X400g           |24X400           |400         |g                |
|IMP DL VANILLA BIB 24X400g       |24X400           |400         |g                |
|IMP DL VANILLA BIB 24X400g       |24X400           |400         |g                |
|IMP DL123 VANILLA 12X900g PRO    |12X900           |900         |g                |
+---------------------------------+-----------------+------------+-----------------+
only showing top 10 rows

In [None]:
#Remove the CUST_CD and DIST_CD that are already in the nielson_data. Because we can already map them with the 
#district of the nielson file
#Get the nielson keys
dms_nielson_keys = dms_nielson.select("CUST_CD","DIST_CD").dropDuplicates()
#Get the DMS keys
cust_v5_keys = cust_v5.select("CUST_CD","DIST_CD").dropDuplicates()
#Get the difference
difference_keys = cust_v5_keys.subtract(dms_nielson_keys)

Spark Profiling:
import spark_df_profiling
    
    
    Nielson_data.cache()
profile = spark_df_profiling.ProfileReport(df = Nielson_data,spark = spark)

profile.to_file(outputfile = "/mapr/fcmaprwithspark1/test_zone/ds_playground/outputNielsen.html")

- vim editor: 
to save and quit type :wq enter
to insert press Insert button
to exit Insert mode press Esc

In [None]:
#extract the brand from mailing_name, using regex

products_regex = {'wellness':'^.*([S|s]auna|[W|w]ellness|[T|t]hermen|[S|s]pa|[B|b]eauty|[H|h]ealth|[Z|z]ontegoed|[E|e]lysium|[O|o]ntspanningspakket).*$',
               'kadeaubon':'^.*([K|k]adobon|[V|v]oucher|[C|c]adeaubon|[W|w|aardenbon]).*$',
                  'clothes':'^.*([Z|z]alando|[S|s]choenen|[A|a]didas).*$'
                  'food':'^.*([P|p]annenkoek|[G|g]ourmet|[T|t]hee|[C|c]hocolade).*$',
               'film':'^.*([F|f]ilm|[P|p]athe).*$',
                  'theater': '^.*([S|s]ister act|[W|w]icked|[P|p]oppins|[S|s]aigon|[M|m]usical|[R|r]ang|[W|w]e will wock you).*$',
                  'outings': '^.*([Z|z]oo|[A|a]dventure|[A|a]vonturen|[W|w]aarbeek|[E|e]ntree|[W|w]ildlands|[A|a]penheul|[D|d]ierenpark|[P|p]ark|[D|d]elfinarium|[D|d]rievliet|[S|s]ea life|Bobbejaanland|het loo|[C|c]orpus|[K|k]abouterland|[E|e]comare|[P|p]lopsa indoor|[A|a]viodrome|[M|madame [T|t]ussauds|[F|f]estival|[S|s]peelland|[B|b]allorig).*$', 
                  'sport':'^.*([S|s]port|[V|v]oetbal|[M|m]arikenloop|[F|f]itness|[Z|z]wem|[R|r]unner).*$',
               'magazine':'^.*([M|m]agazine|[W|w]eekblad).*$',
                 'hotel_holidays':'^.*([H|h]otel|[S|s]eaways|[B|b]elvilla|[V|v]errassingsreis|[B|b]ed & [B|b]reakfast|[W|w]eertebergen|[V|v]akantiewoning).*$',
                 'house': '^.*([B|b]adgoed|[M|m]elkopschuimer|[P|p]hilips|[P|p]avina|[R|r]ituals|[D|d]ekbed|[T|t]as|[F|f]atboy|[C|c]up).*$'}

#add a new column called mailing_brand, containing the name of the regex
df = EU_orderProduct.select('name', 'artcode').withColumn('products_cat', col('name'))

for name, regex in products_regex.items():
    df = df.withColumn('products_cat', regexp_replace('products_cat', str(regex), str(name)))

In [None]:
from pyspark.sql import functions as F

df = emails_part


df.agg(F.min(df['email_sent_date'])).show()

emails_EU.filter("E_sendtriggermail1date != '0000-00-00'").select(F.min(emails_EU['E_sendtriggermail1date'])).show()
emails_EU.filter("E_sendtriggermail2date != '0000-00-00'").select(F.min(emails_EU['E_sendtriggermail2date'])).show()
emails_EU.filter("E_sendtriggermail3date != '0000-00-00'").select(F.min(emails_EU['E_sendtriggermail3date'])).show()


In [None]:
from pyspark.sql import functions as F


df = emails_EU


df = df.withColumn("delta_days", F.datediff(df.modified, df.E_email_sent_date))

In [None]:
from pyspark.sql import functions as F

def replace_type(column, value):
    return (when (column == value, lit(1)).otherwise(lit(0)))

def sent_date(x, y):
    return (when(x == "Sent", y).otherwise("null"))
    
df = emails_part 

df = df.withColumn('sent', replace_type(F.col('DMDtype'), 'Sent'))
df = df.withColumn('open', replace_type(F.col('DMDtype'), 'Open'))
df = df.withColumn('trigger', replace_type(F.col('DMDtype'), 'Trigger'))
df = df.withColumn('click', replace_type(F.col('DMDtype'), 'Click'))
df = df.withColumn('unsubscribe', replace_type(F.col('DMDtype'), 'Unsubscribe'))
df = df.withColumn('softbounce 1x', replace_type(F.col('DMDtype'), 'SoftBounce 1x'))
df = df.withColumn('hardbounce final', replace_type(F.col('DMDtype'), 'HardBounce final'))
df = df.withColumn('spamcomplaint', replace_type(F.col('DMDtype'), 'Spamcomplaint'))
df = df.withColumn('sent_date', sent_date(F.col("DMDtype"), F.col("DMDlogDate")))

In [None]:
df = df.groupBy("accountid").agg({'sent': 'sum','open':'sum','trigger': 'sum', 'click':'sum','unsubscribe':'sum','softbounce 1x':'sum', 'hardbounce final':'sum', 'spamcomplaint':'sum'})

In [None]:
from pyspark.sql.types import * 
import pyspark.sql.functions as func 

def email_sent(t, d):
    if t == 'Sent':
        return d
    else:
        return None

email_sent_udf = func.udf(email_sent, StringType())

df_r = df_r.withColumn('email_sent_date', email_sent_udf('DMDtype', 'DMDlogDate'))


In [None]:
df1_tmp = df1.groupBy("E_accountId").agg({"delta_days_pos":"sum", "delta_days_neg": "sum","delta_days_null": "sum"}).\
withColumnRenamed("sum(delta_days_pos)", "sum_delta_days_pos").\
withColumnRenamed("sum(delta_days_neg)", "sum_delta_days_neg").\
withColumnRenamed("sum(delta_days_null)", "sum_delta_days_null")


In [None]:
from pyspark.sql.types import * 
import pyspark.sql.functions as F

def int_to_string(x):
    return str(x)

df = df_cnt

int_to_stringUDF = F.udf(int_to_string, StringType())

df = df.withColumn("delta_days_S", int_to_stringUDF("delta_days"))


In [None]:
print("Number of rows: ",emails_EU.count())

print("Number of distinct accounts: ", emails_EU.select("E_accountid").distinct().count())

print("Min date in emails: ", emails_EU.agg(F.min(F.col("E_sent_date"))).show())
print("Max date in emails: ", emails_EU.agg(F.max(F.col("E_sent_date"))).show())
count().sort(desc('count')).show(20,False)

In [None]:
calls_types={'27':'RawData', '29':'RawData', '23':'Recruitment', '24':'Recruitment', '25':'Retention', '26':'Retention', '28':'CheckNewUser', '30':'CheckNewUser', '6':'Hotline', '1':'UpdateProfile', '3':'DropOut', '':'Other'}

tmp = phone_df.filter("StateCode!='0'") \
        .replace(to_replace=calls_types, value=[], subset=['new_CampaignType']) \
        .withColumn(colName='new_CampaignType', col=regexp_replace('new_CampaignType', '[0-9]+', 'Other')) \
        .withColumn(colName='Date', col=regexp_replace('ActualEnd', '-[0-9]+-[0-9]+ [0-9]+:[0-9]+:[0-9]+', '')) \
        .groupBy('Date', 'new_CampaignType') \
        .count() \
        .groupBy('Date') \
        .pivot(pivot_col='new_CampaignType', values=list(set(val for val in calls_types.values()))) \
        .sum('count') \
        .dropna(subset=['Date']) \
        .fillna(0) \
        .orderBy('Date')
        
tmp.coalesce(1).write.csv('/mapr/fcmaprwithspark1/test_zone/ds_playground/projects/marketing_conversion/2.5_analysis/results/call_types_ts-year.csv', header='true', mode='overwrite')
print('Saved successfully!')
tmp.show()
        

In [None]:
def get_counts_group(df, column):
    return df.groupBy(column).count().sort("count", ascending = False).show(10,False)


In [None]:
# Cumulative Sum

features_tab.registerTempTable("tmp")

df2 = sqlContext.sql("SELECT tmp.*, \
    sum(CombinedLabel) OVER (PARTITION BY Contact_Id ORDER BY Green_ActualEnd) as cumsum FROM tmp")

In [None]:

# From pandas to spark data frame

from pyspark.sql import SQLContext

# Random Forest
predictions = pd.DataFrame(predictions)
sqlCtx = SQLContext(sc)

predictions = sqlCtx.createDataFrame(predictions)

predictions.write.parquet("/mapr/fcmaprwithspark1/test_zone/ds_playground/projects/marketing_conversion/4_modelling/data/scikit_random_forest", mode="overwrite")
print('Saved!')

In [None]:
# Write as a data frame 


labels=test[[label_]].as_matrix()

res_df = sqlContext.createDataFrame(\
                                pd.DataFrame(\
                                            np.concatenate(\
                                                           [predictions.reshape(predictions.shape[0], 1), probabilities, labels],\
                                                           axis=1\
                                                           ), \
                                            columns=['Prediction','Probability_0', 'Probability_1','Label']\
                                            ),\
                                samplingRatio=0.1\
                                   )
 

In [None]:
# Importance of Features

In [None]:
def importance_features(model, X):
    importances = model.feature_importances_
    std = np.std([tree.feature_importances_ for tree in model.estimators_],
             axis=0)
    indices = np.argsort(importances)[::-1]

# Print the feature ranking
    print("Feature ranking:")

    for f in range(X.shape[1]):
        print("%d. feature %d (%f)" % (f + 1, indices[f], importances[indices[f]]))

# Plot the feature importances of the forest
    plt.figure()
    plt.title("Feature importances")
    plt.bar(range(X.shape[1]), importances[indices],
                       color="r", yerr=std[indices], align="center")
    plt.xticks(range(X.shape[1]), indices)
    plt.xlim([-1, X.shape[1]])
    plt.show()

In [None]:
importance_features(model = rf, X = trainArr)

In [None]:
# Create dummy dataframe

data = sc.parallelize([ 
[('PackSize', 1.0), ('Name', 'A')],
[('PackSize', 1.0), ('Name', 'B')],
[('PackSize', 30.0), ('Name', 'C')]
    ])
# Convert to tuple
data_converted = data.map(lambda x: (x[0][1], x[1][1]))

# Define schema
schema = StructType([
    StructField("Packsize", DoubleType(), True),
    StructField("Name", StringType(), True)
])

# Create dataframe
DF = sqlContext.createDataFrame(data_converted, schema)
DF.show()

In [None]:
# Calculate cumulative sum

DF.registerTempTable('df')
df2 = sqlContext.sql(" SELECT sum(Packsize) OVER (ORDER BY Name) as cumsum FROM df")
df2.show()

In [None]:
# How to unpack vector

results_df = spark.read.parquet("/mapr/fcmaprwithspark1/test_zone/ds_playground/projects/marketing_conversion/4_modelling/data/result_logisticreg")
results_df.select('probability').show(10, False)
results_df.groupBy('probability').count().orderBy(desc('count')).show(20, False)

#

tmp = results_df.rdd.map(lambda x: (x[4], float(x[3][0]), float(x[3][1]) ) ).toDF(['prediction', 'Prob0', 'Prob1'])
tmp.show(100)
tmp.groupBy('Prob1').count().orderBy(desc('count')).show(20, False)

In [None]:
# Extract cumulative gain

# Load results
results_df = spark.read.parquet("/mapr/fcmaprwithspark1/test_zone/ds_playground/projects/marketing_conversion/4_modelling/data/result_logisticreg")
results_df.groupBy('probability').count().orderBy(desc('count')).show(5, False)
results_df.printSchema()
results_df.show()
#
# Unpack probabilities
tmp = results_df.rdd.map(lambda x: (x[1], float(x[3][0]), float(x[3][1]),  x[4]) ).toDF(['Label', 'Prob0', 'Prob1', 'prediction'])
tmp.show()

In [None]:
# Add columns of gain and cost
tmp2 = tmp.withColumn('Gain', lit(500))
tmp2 = tmp2.withColumn('Gain', col('Label')*col('prediction')*col('Gain'))
tmp2 = tmp2.withColumn('Cost', lit(5))
tmp2.show()
tmp2.filter('Label=1 and prediction=1').show()

In [None]:
tmp4 = sqlContext.sql(" SELECT row, Label, prediction, Prob0, Prob1, Cost, Gain, sum(Gain) OVER (ORDER BY row) as cumGain FROM tmp3")
tmp4.show(400)
tmp4.registerTempTable('tmp4')

In [None]:
tmp5 = sqlContext.sql(" SELECT Label, prediction, Prob0, Prob1, Cost, Gain, cumGain, sum(Cost) OVER (ORDER BY row) as cumCost FROM tmp4")
tmp5.show(400)

In [None]:
# Do a left outer join of table 'left_df' with table 'right_df', on 'col_id'
# After the join, column 'col_date' is renamed to 'name_date' and converted to Date format 
# reading the original strings by 'format_src' and creating the dates by 'format_end'
def join_date(left_df, right_df, col_id, col_date, name_date, format_src, format_end):
    res = left_df.join( \
                        right_df.select(col_id, col_date) \
                                .withColumnRenamed(col_date, name_date), \
                        on=col_id, \
                        how='left_outer' \
                        ) \
                 .withColumn( \
                             colName=name_date, \
                             col=from_unixtime( \
                                                unix_timestamp( \
                                                              col(name_date), \
                                                              format_src \
                                                             ), \
                                                format_end \
                                               ) \
                            )
    return res

In [None]:
features_tab = join_date(features_tab, \
                          phone_latest_df.select('ActivityId', 'ActualEnd') \
                                         .withColumnRenamed('ActivityId', 'GreenId'), \
                          'GreenId', \
                          'ActualEnd', \
                          'Green_ActualEnd', \
                          'yyyy-MM-dd hh:mm:ss', \
                          'yyyy-MM-dd hh:mm:ss' \
                          )

In [None]:
def rename_col(df):
    oldColumns = df.schema.names
    newColumns = [(ii+'_psam') for ii in oldColumns]
    df_new = df
# exclude last two (careful with this !!!) 
    for ii in range(0, len(oldColumns)-2):
        df_new = df_new.withColumnRenamed(oldColumns[ii], newColumns[ii])
    return df_new

psam_new = rename_col(psam)

In [None]:
def rename_columns(df,column_dict):
    for key,val in column_dict.items():
        df = df.withColumnRenamed(key,val)
    return df

In [None]:
#Rename the columns
column_dict = {"DMDcampaignName":"brand","DMDmailingName":"mailing_name",\
              "DMDlogDate":"log_date","DMDtype":"event_type","accountid":"accountid",\
               "webpower_id":"mail_id"}
Ne = rename_columns(Ne,column_dict)
Ne = Ne.select(list(column_dict.values()))
Ne.printSchema()

In [None]:
# print columns with > 95% nulls
#as the other invalid options are possible only in dates column, I add them to the main function

bad_columns = []
def count_null(df,column,total):
    cnt_space = df.where(df[column] == "").count()
    cnt_NULL = df.where(df[column] == "NULL").count()
    cnt_null = df.where(df[column].isNull()).count()
    cnt_1970 = df.where(df[column] == '0000-00-00 00:00:00').count()
    cnt_0000 = df.where(df[column] == '1970-01-01 01:00:00').count()
    if((((cnt_null + cnt_NULL + cnt_space+ cnt_1970 + cnt_0000)/total) * 100) > 95):
        bad_columns.append('{}'.format(column))

#the list of bad columns is automatically saved

In [None]:
#When the value is between 30 and 0, then return 1 else 0
def smallerThen30(column):
    return when(column < 0, 0).otherwise(\
                when(column < 30, 1).otherwise(\
                    when(column > 30,0)))

#Now we make for every different pillar, a column if it is in the range of the active period
#Datediff in days.
list_pillar = ["uppc","cashback","winaction"]
#Make the activities
for pillar in list_pillar:
    #Calculate the number of days between the activity and the email sent
    activity = activity.withColumn("diff",datediff(col("sendout_date"),col("{}_created".format(pillar))))
    #When if the activity is before 30 days of the log date
    activity = activity.withColumn("{}_activity".format(pillar),smallerThen30(col("diff")))

def isActive(column):
    return when(column > 0,1).otherwise(0)

activity = activity.withColumn("account_isActive",\
                               isActive(col("uppc_activity")+col("cashback_activity")+col("winaction_activity")))

for pillar in list_pillar:
    #Clean up
    activity = activity.drop("{}_created".format(pillar))\
        .drop("{}_activity".format(pillar))

activity = activity.filter("account_isActive == '1'")
activity = activity.dropDuplicates()
print("Number of persons are active before email :{}".format(activity.count()))

In [None]:
# Given the ts dates and a dataframe of events, create the aggregated time series with counts of events per week
def create_series(dates, events, name):
    # Join the given events to the dates. Each event should be joined to exactly one week so the result should has the 
    # same number of rows as the events table
    dates.registerTempTable("dates")
    events.registerTempTable("events")
    cnt_all = events.count()
    tmp = sqlContext.sql("SELECT\
                            d.*, e.*\
                          FROM\
                            events e LEFT OUTER JOIN dates d\
                          WHERE\
                            e.sendout_date>=d.week_start AND e.sendout_date<=d.week_end\
                         ")
    cnt_res = tmp.count()
    if cnt_all==cnt_res:
        print('\tResult has correct number of rows: ' + str(cnt_res))
    else:
        print('\tResult has incorrect number of rows: ' + str(cnt_res) + ' expected ' + str(cnt_all))
    # Aggregate by time and active/new and count
    res = tmp.groupBy('week_start', 'week_end', 'account_isActive', 'account_isNew').count().withColumnRenamed('count', name)
    print('\tAggregation rows: ' + str(res.count()))
    return res

# Create start table ts dates X isActive X isNew
result = sqlContext.sql("SELECT\
                            d.*, a.*, n.*\
                         FROM\
                            ts_dates d LEFT OUTER JOIN isActive a LEFT OUTER JOIN isNew n\
                        ")
print('Rows: ' + str(result.count()))

# Add events series
for event in ['sendout', 'click', 'open']:
    result = result.join(create_series(ts_dates, wp.filter("event_type=='" + event + "'"), event), on=['week_start', 'week_end', 'account_isActive', 'account_isNew'], how='left_outer')
    print('Rows: ' + str(result.count()))
result = result.join(create_series(ts_dates, wp.filter("event_type like '%bounce%'"), 'bounce'), on=['week_start', 'week_end', 'account_isActive', 'account_isNew'], how='left_outer')
print('Rows: ' + str(result.count()))

result.show(4)

In [None]:
#extract the brand from mailing_name, using regex

brands_regex = {'Eurosparen':'^.*(ES|es|Es|[E|e]urosparen).*$',
               'Milner':'^.*([M|m]ilner|Miln|MILN).*$',
               'Campina':'^.*([C|c]am|AM|[C|c]amp|AMP|[C|c]ampina|CAMPINA).*$',
               'Optimel':'^.*([O|o]ptimel|OPT|opt|Opt).*$',
               'Mona':'^.*([M|m]ona).*$'}

#add a new column called mailing_brand, containing the name of the regex
wp_brands = wp.withColumn('mailing_brand', col('mailing_name'))

for name, regex in brands_regex.items():
    wp_brands = wp_brands.withColumn('mailing_brand', regexp_replace('mailing_brand', str(regex), str(name)))

In [None]:
#create dates array to use for plots (containing only week_start date and not time):

weeks = New_Act_pd['week_start'].unique()   #.unique()    #taking each week once
dates = []
for w in weeks:
    dates.append(w[:10]) 
print ('the number of weeks is: ', len(dates))
print (dates)

In [None]:
# remove quotation marks
from pyspark.sql.functions import udf

def NL_crm_sanitize_id(in_id):
    if in_id:
        # transforms ["xxx"] into xxx
        if in_id.startswith('"'):
            in_id = in_id[1:]
        if in_id.endswith('"'):
            in_id = in_id[:-1]
    return in_id

f_udf = udf(NL_crm_sanitize_id, StringType())

In [None]:
df = email_react

for col in email_react.columns:
#    col_tmp = regexp_replace(col, '"','')
    email_react = email_react.withColumn(col, f_udf(col))    

In [None]:
# remove quates from the column names

def rename_col(df):
    oldColumns = df.schema.names
    newColumns = [ii.strip('"') for ii in oldColumns]
    df_new = df
    for ii in range(0, len(oldColumns)):
        df_new = df_new.withColumnRenamed(oldColumns[ii], newColumns[ii])
    return df_new


In [None]:
from pyspark.sql.functions import lit,year,weekofyear,col,month,year,concat
def get_index(df, columns,sqlContext):
    
    df.registerTempTable("sales")
    string = ""
    for col in columns:
        string += "s2.{}/s1.{} as ratio{}, s2.{}-s1.{} as index{},".format(col,col,col,col,col,col)
    #Remove the last digit from the sring
    string = string[:-1]
    
    sales_index = sqlContext.sql("SELECT s1.province, s1.year as year1, s1.weekN,s2.year as year2,\
                           "+string+" FROM sales s1 \
                           JOIN sales s2 ON (s1.province = s2.province) \
                           WHERE s2.year = (s1.year+1) AND s1.weekN = s2.weekN")
    sqlContext.dropTempTable("sales")
    return sales_index



def count_month_year(df,date_column):
    df =  df.withColumn("month",month(date_column)).withColumn("year",year(date_column))
    df = df.withColumn("month_year",concat(col("month"),lit("_"),col("year")))
    df = df.filter("year > 2014 AND year < 2018")
    df.groupby("month_year").count().sort("month_year").show(100,False)
    
    
def date_weekN_year(df,date_column):
    df = df.withColumn("weekN",weekofyear(date_column)).withColumn("year",year(date_column))
    return df
    

Possible sources of features:
=============================

- new_PreviousProductUsed2Name
- new_CurrentProductUsed2Name
- new_CitySourceName
- new_LocationName
- OwnerId (codes), OwnerIdName( mainly Friso Moments)
- new_childrenId (KEY)
- statuscode
- new_name
- new_child_prod_used
- new_ChildStage (very good one)
- new_EntryAtStage
- new_Gender
- new_Customer
- new_ChildAge (good one)
- new_ChildStage (good one)
- new_PreviousProductUsed
- new_DataSource
- new_DataType1
- new_DataType2
- mew_MethodCollectData
- new_CitySource (code)
- new_Location
- new_CurrentProductUsed2
- new_PreviousProductUsed2_PackageSize
- new_CurrentProductUsed2
- new_PreviousProductUsed2_PackageSize



# ------- INIT & LOADING -------

In [1]:
# All imports here

from pyspark.sql.functions import * 
from pyspark.sql.types import * 

In [2]:
# Init Spark Session

try:
    spark
except NameError:
    from pyspark.sql import SparkSession
    
    spark = SparkSession\
                .builder \
                .appName("Datascience") \
                .getOrCreate()
    print("SparkSession created")
else:
    print("SparkSession already exists")

SparkSession already exists


In [3]:
# Load clean phonecall data
children_df = spark.read.parquet("maprfs:///test_zone/ds_playground/projects/marketing_conversion/1_cleaning/data/children_new")
children_df.printSchema()

root
 |-- new_CustomerName: string (nullable = true)
 |-- new_PreviousProductUsed2Name: string (nullable = true)
 |-- new_CurrentProductUsed2Name: string (nullable = true)
 |-- new_CitySourceName: string (nullable = true)
 |-- new_MedicalRepName: string (nullable = true)
 |-- new_LocationName: string (nullable = true)
 |-- new_KeyinAgentYomiName: string (nullable = true)
 |-- CreatedByName: string (nullable = true)
 |-- CreatedByYomiName: string (nullable = true)
 |-- ModifiedByName: string (nullable = true)
 |-- ModifiedByYomiName: string (nullable = true)
 |-- new_KeyinAgentName: string (nullable = true)
 |-- OwnerId: string (nullable = true)
 |-- OwnerIdName: string (nullable = true)
 |-- OwnerIdYomiName: string (nullable = true)
 |-- OwnerIdDsc: string (nullable = true)
 |-- OwningUser: string (nullable = true)
 |-- new_childrenId: string (nullable = true)
 |-- CreatedOn: string (nullable = true)
 |-- CreatedBy: string (nullable = true)
 |-- ModifiedBy: string (nullable = true)
 |-

In [4]:
# List of interesting features

ft_list = ['new_CurrentProductUsed2Name', 'new_CitySourceName', 
 'OwnerIdName','new_childrenId', 'statecode', 'new_CurrentChildStage', 'new_EntryAtStage',
'new_ChildAge', 'new_DataType1', 
'new_CitySource', 'new_PreviousProductUsed2_PakageSize', 'new_CurrentProductUsed2',
'new_PreviousProductUsed2_PakageSize']

In [6]:
children_df.count()

2830262

In [5]:
def get_counts_group(df, column):
    return df.groupBy(column).count().sort("count", ascending = False).show(10,False)


In [20]:
for col in ft_list:
    get_counts_group(children_df,col)

#children_df.groupBy('new_PreviousProductUsed2_PakageSize').count().sort(desc('count')).show(10,False)

+-----------------------------------+-------+
|new_CurrentProductUsed2Name        |count  |
+-----------------------------------+-------+
|                                   |1082569|
|Sữa Mẹ                             |202254 |
|Fresh milk                         |135741 |
|-                                  |122322 |
|Friso Gold 3 12x900gr              |103949 |
|Friso Gold 1 24x400gr              |96469  |
|Friso Gold 3 24x400gr              |82585  |
|Others                             |79650  |
|Non Use                            |72471  |
|IMP FRISOLAC GOLD 1 24X400G SUNRISE|62805  |
+-----------------------------------+-------+
only showing top 10 rows

+------------------+-------+
|new_CitySourceName|count  |
+------------------+-------+
|                  |1071694|
|HCM               |559249 |
|Hà Nội            |303417 |
|North (Others)    |237564 |
|Central (Others)  |135900 |
|East (Others)     |131250 |
|Mekong (Others)   |113551 |
|Biên Hòa          |68481  |
|Đà Nẵng   

In [None]:
### Comments:
- statcode is mainly 0
- new_CurrentChild Stage and new_ChildAge refer to the same
- new_EntryAtStage
- new_DataType1


# ---- FEATURES ------

- 1. new_ChildAge and new_CurrentChildStage are the same
- 2. new_DataType1
- 3. 

## 1. Feature: child_stage

In [6]:
df = children_df

In [25]:
df.select('new_CurrentChildStage', 'new_ChildAge').show(50, False)

+---------------------+-------------------------+
|new_CurrentChildStage|new_ChildAge             |
+---------------------+-------------------------+
|Stage 3 (13 - 24)    |1 year 5 months 8 days   |
|Stage 4 (25 - 48)    |4 years 10 days          |
|Stage 4 (25 - 48)    |3 years 30 days          |
|Stage 4 (25 - 48)    |2 years 5 months 5 days  |
|Stage 4 (25 - 48)    |3 years 4 months 28 days |
|Stage 3 (13 - 24)    |1 year 4 months 21 days  |
|Stage 4 (25 - 48)    |4 years 29 days          |
|Stage 3 (13 - 24)    |1 year 4 months 22 days  |
|Stage 4 (25 - 48)    |2 years 1 month 6 days   |
|Stage 3 (13 - 24)    |1 year 2 months 6 days   |
|Stage 4 (25 - 48)    |2 years 3 months 16 days |
|Stage 4 (25 - 48)    |2 years 2 months 26 days |
|Stage 2 (7 - 12)     |9 months 9 days          |
|Stage 4 (25 - 48)    |3 years 4 months 15 days |
|Stage 4 (25 - 48)    |2 years 9 months 8 days  |
|Stage 4 (25 - 48)    |3 years 7 months 8 days  |
|Stage 4 (25 - 48)    |3 years 3 months 28 days |


In [7]:
df = df.withColumn('child_stage', df['new_CurrentChildStage'])

In [8]:
df.select('child_stage', 'new_CurrentChildStage').show(20,False)

+-----------------+---------------------+
|child_stage      |new_CurrentChildStage|
+-----------------+---------------------+
|Stage 3 (13 - 24)|Stage 3 (13 - 24)    |
|Stage 4 (25 - 48)|Stage 4 (25 - 48)    |
|Stage 4 (25 - 48)|Stage 4 (25 - 48)    |
|Stage 4 (25 - 48)|Stage 4 (25 - 48)    |
|Stage 4 (25 - 48)|Stage 4 (25 - 48)    |
|Stage 3 (13 - 24)|Stage 3 (13 - 24)    |
|Stage 4 (25 - 48)|Stage 4 (25 - 48)    |
|Stage 3 (13 - 24)|Stage 3 (13 - 24)    |
|Stage 4 (25 - 48)|Stage 4 (25 - 48)    |
|Stage 3 (13 - 24)|Stage 3 (13 - 24)    |
|Stage 4 (25 - 48)|Stage 4 (25 - 48)    |
|Stage 4 (25 - 48)|Stage 4 (25 - 48)    |
|Stage 2 (7 - 12) |Stage 2 (7 - 12)     |
|Stage 4 (25 - 48)|Stage 4 (25 - 48)    |
|Stage 4 (25 - 48)|Stage 4 (25 - 48)    |
|Stage 4 (25 - 48)|Stage 4 (25 - 48)    |
|Stage 4 (25 - 48)|Stage 4 (25 - 48)    |
|Stage 4 (25 - 48)|Stage 4 (25 - 48)    |
|Stage 4 (25 - 48)|Stage 4 (25 - 48)    |
|Stage 5 (49 - )  |Stage 5 (49 - )      |
+-----------------+---------------

In [9]:

def child_stage(column):
    return (when(column == 'Stage 0 (Pregnancy)', 'Stage 0 (Pregnancy)').\
    when(column == 'Stage 1 (0 - 6)', 'Stage 1 (0 - 6)').\
    when(column == 'Stage 2 (7 - 12)', 'Stage 2 (7 - 12)').\
    when(column == 'Stage 3 (13 - 24)', 'Stage 3 (13 - 24)').\
    when(column == 'Stage 4 (25 - 48)', 'Stage 4 (25 - 48)').\
    when(column == 'Stage 5 (49 - )', 'Stage 5 (49 - ) ').\
    otherwise('other'))
    

In [10]:
df = df.withColumn('child_stage', child_stage(col('new_CurrentChildStage')))

In [11]:
df.groupBy('child_stage').count().sort(desc('count')).show(20,False)

+-------------------+-------+
|child_stage        |count  |
+-------------------+-------+
|Stage 4 (25 - 48)  |1154067|
|Stage 5 (49 - )    |540734 |
|Stage 3 (13 - 24)  |509245 |
|Stage 2 (7 - 12)   |315706 |
|Stage 1 (0 - 6)    |280596 |
|other              |17151  |
|Stage 0 (Pregnancy)|12763  |
+-------------------+-------+



## 2. Feature: data_type
- don't know the meaning of this one but potentiall might be useful (e.g. if data is coming from hospital or some other source)

In [12]:
df.groupBy('new_DataType1').count().sort(desc('count')).show(20,False)

+-------------+-------+
|new_DataType1|count  |
+-------------+-------+
|             |1290134|
|1            |1278515|
|2            |261489 |
|null         |124    |
+-------------+-------+



In [17]:
def replace(column):
    return (when(column == '1','1' ).when(column=='2', '2').otherwise('blank'))

df = df.withColumn('data_type', replace(col('new_DataType1')))

In [18]:
df.groupBy('data_type').count().sort(desc('count')).show(20,False)

+---------+-------+
|data_type|count  |
+---------+-------+
|blank    |1290258|
|1        |1278515|
|2        |261489 |
+---------+-------+



## 3. Feature: entry_at_stage
- don't know the meaning of this one but potentiall might be useful

In [19]:
df.groupBy('new_EntryAtStage').count().sort(desc('count')).show(20,False)

+-------------------+-------+
|new_EntryAtStage   |count  |
+-------------------+-------+
|1                  |1127392|
|                   |1001863|
|3                  |400210 |
|4                  |163332 |
|2                  |114967 |
|0                  |22412  |
|null               |79     |
|05/27/2016 17:00:00|1      |
|10/05/2013 17:00:00|1      |
|04/18/2016 17:00:00|1      |
|03/03/2015 17:00:00|1      |
|06/24/2013 17:00:00|1      |
|04/17/2014 17:00:00|1      |
|04/11/2015 17:00:00|1      |
+-------------------+-------+



In [22]:
def replaceE(column):
    return (when(column == '0', '0').when(column == '1','1' ).when(column=='2', '2').\
            when(column == '3', '3').when(column=='4','4').\
            otherwise('mess'))

df = df.withColumn('entry_at_stage', replaceE(col('new_EntryAtStage')))

In [23]:
df.groupBy('entry_at_stage').count().sort(desc('count')).show(20,False)

+--------------+-------+
|entry_at_stage|count  |
+--------------+-------+
|1             |1127392|
|mess          |1001949|
|3             |400210 |
|4             |163332 |
|2             |114967 |
|0             |22412  |
+--------------+-------+



## 4. Feature: current_product_used

In [24]:
df.groupBy('new_CurrentProductUsed2Name').count().sort(desc('count')).show(50,False)

+--------------------------------------+-------+
|new_CurrentProductUsed2Name           |count  |
+--------------------------------------+-------+
|                                      |1082569|
|Sữa Mẹ                                |202254 |
|Fresh milk                            |135741 |
|-                                     |122322 |
|Friso Gold 3 12x900gr                 |103949 |
|Friso Gold 1 24x400gr                 |96469  |
|Friso Gold 3 24x400gr                 |82585  |
|Others                                |79650  |
|Non Use                               |72471  |
|IMP FRISOLAC GOLD 1 24X400G SUNRISE   |62805  |
|IMP FRISO GOLD 4 24X400G SUNRISE      |36634  |
|Grow Plus+ (>2y) (suy DD)             |33835  |
|IMP FRISO GOLD 4 12X900G SUNRISE      |30077  |
|Friso Gold 4 12x900gr                 |29910  |
|Friso Gold 2 24x400gr                 |27953  |
|Friso Gold 2 12x900gr                 |27808  |
|Enfagrow A+ (1y-3y)                   |26723  |
|Pediasure          

In [89]:
# Important .* means any string before or after (e.g. '.*xxx.*' matches sthgxxxsthgxxx)
# "^\s*$" is empty string


df = df.withColumn('current_product_used', regexp_replace('new_CurrentProductUsed2Name','.*Friso Gold 1.*|.*Friso 1.*|.*GOLD 1.*', 'Friso Gold 1'))
df = df.withColumn('current_product_used', regexp_replace('current_product_used','.*Friso Gold 2.*|.*GOLD 2.*', 'Friso Gold 2'))
df = df.withColumn('current_product_used', regexp_replace('current_product_used','.*Friso Gold 3.*|.*Friso 3.*|.*GOLD 3.*', 'Friso Gold 3'))
df = df.withColumn('current_product_used', regexp_replace('current_product_used','.*Friso Gold 4.*|.*GOLD 4.*', 'Friso Gold 4'))
df = df.withColumn('current_product_used', regexp_replace('current_product_used','.*DUTCH LADY.*|.*Dutch Lady.*', 'Dutch Lady'))
#df = df.withColumn('current_product_used', regexp_replace('current_product_used','.*Dielac Optimum.*', 'Dielac Optimum'))
df = df.withColumn('current_product_used', regexp_replace('current_product_used','.*Dielac.*', 'Dielac'))
df = df.withColumn('current_product_used', regexp_replace('current_product_used','.*Similac.*', 'Similac'))
df = df.withColumn('current_product_used', regexp_replace('current_product_used','.*Friso Gold Pedia.*', 'Friso Gold Pedia'))
df = df.withColumn('current_product_used', regexp_replace('current_product_used','.*Nuti.*', 'Nuti'))
df = df.withColumn('current_product_used', regexp_replace('current_product_used','.*Nan.*', 'Nan'))
df = df.withColumn('current_product_used', regexp_replace('current_product_used','.*Dutch Baby.*|.*Dutchbaby.*', 'Dutch Baby'))
df = df.withColumn('current_product_used', regexp_replace('current_product_used','.*Grow.*', 'Grow'))
df = df.withColumn('current_product_used', regexp_replace('current_product_used','.*Gain.*', 'Gain'))
df = df.withColumn('current_product_used', regexp_replace('current_product_used','.*Dumex.*', 'Dumex'))
df = df.withColumn('current_product_used', regexp_replace('current_product_used',"^\s*$", 'Blank'))
df = df.withColumn('current_product_used', regexp_replace('current_product_used',"^-", 'Blank'))




In [90]:
df.groupBy('current_product_used').count().sort(desc('count')).show(50,False)

+-------------------------------------+-------+
|current_product_used                 |count  |
+-------------------------------------+-------+
|Blank                                |1204891|
|Friso Gold 3                         |243927 |
|Sữa Mẹ                               |202254 |
|Friso Gold 1                         |194544 |
|Fresh milk                           |135741 |
|Dielac                               |119788 |
|Friso Gold 4                         |107177 |
|Others                               |79650  |
|Non Use                              |72471  |
|Grow                                 |68959  |
|Friso Gold 2                         |64690  |
|Dutch Lady                           |45140  |
|Similac                              |38969  |
|Nuti                                 |26783  |
|Enfagrow A+ (1y-3y)                  |26723  |
|Pediasure                            |26187  |
|Nan                                  |21160  |
|Dutch Baby                           |1

### Comment: this feature cannot be used directly inmodelling but should be useful anyway !!!

# 5. Feature: city_name

In [85]:
df.groupBy('new_CitySourceName').count().sort(desc('count')).show(50,False)

+------------------+-------+
|new_CitySourceName|count  |
+------------------+-------+
|                  |1071694|
|HCM               |559249 |
|Hà Nội            |303417 |
|North (Others)    |237564 |
|Central (Others)  |135900 |
|East (Others)     |131250 |
|Mekong (Others)   |113551 |
|Biên Hòa          |68481  |
|Đà Nẵng           |56949  |
|Cần Thơ           |52907  |
|Hải Phòng         |43871  |
|-                 |34512  |
|Nha Trang         |20917  |
+------------------+-------+



In [106]:
df = df.withColumn('city_name', regexp_replace('new_CitySourceName', "^\s*$", "Blank"))
df = df.withColumn('city_name', regexp_replace('city_name', "^-", "Blank"))

In [108]:
df.groupBy('city_name').count().sort(desc('count')).show(50,False)

+----------------+-------+
|city_name       |count  |
+----------------+-------+
|Blank           |1106206|
|HCM             |559249 |
|Hà Nội          |303417 |
|North (Others)  |237564 |
|Central (Others)|135900 |
|East (Others)   |131250 |
|Mekong (Others) |113551 |
|Biên Hòa        |68481  |
|Đà Nẵng         |56949  |
|Cần Thơ         |52907  |
|Hải Phòng       |43871  |
|Nha Trang       |20917  |
+----------------+-------+



In [109]:
df.printSchema()

root
 |-- new_CustomerName: string (nullable = true)
 |-- new_PreviousProductUsed2Name: string (nullable = true)
 |-- new_CurrentProductUsed2Name: string (nullable = true)
 |-- new_CitySourceName: string (nullable = true)
 |-- new_MedicalRepName: string (nullable = true)
 |-- new_LocationName: string (nullable = true)
 |-- new_KeyinAgentYomiName: string (nullable = true)
 |-- CreatedByName: string (nullable = true)
 |-- CreatedByYomiName: string (nullable = true)
 |-- ModifiedByName: string (nullable = true)
 |-- ModifiedByYomiName: string (nullable = true)
 |-- new_KeyinAgentName: string (nullable = true)
 |-- OwnerId: string (nullable = true)
 |-- OwnerIdName: string (nullable = true)
 |-- OwnerIdYomiName: string (nullable = true)
 |-- OwnerIdDsc: string (nullable = true)
 |-- OwningUser: string (nullable = true)
 |-- new_childrenId: string (nullable = true)
 |-- CreatedOn: string (nullable = true)
 |-- CreatedBy: string (nullable = true)
 |-- ModifiedBy: string (nullable = true)
 |-

In [110]:
children_ft = df.select("new_childrenId", "child_stage", "data_type","entry_at_stage","current_product_used","city_name")

# ------- SAVING -------

In [111]:
children_ft.write.parquet("maprfs:///test_zone/ds_playground/projects/marketing_conversion/2_features/data/children_features", mode="overwrite")
print('Saved successfully!')

Saved successfully!
