### Load packages and configure parameters

In [2]:
from pyspark.sql import functions as f
from pyspark.sql import Window

countryCode = 'SG'

### Select timeframes

In [4]:
# buying in the last 6 months
month1 = spark.table('dev_derived_reco.sales_time_frames') \
  .where(f.upper(f.col('countryCode')) == countryCode) \
  .select('maxYearMonthMin6').collect()[0][0]

In [5]:
from datetime import date
from dateutil.relativedelta import relativedelta

# visited in the last 12 months
today = date.today()+ relativedelta(months=-12)
month2 = str(today.strftime("%Y%m"))

print(month1, month2)


### Select buying Operators

In [7]:

dfBuying = spark.table('dev_derived_reco.sales_max_corrected_filtered')

dfBuying = dfBuying.where(f.substring(f.col('transactionDateString'),1,6) >= month1).where(f.col('countryCode')==countryCode)
print(month1, dfBuying.select('operatorOhubId').distinct().count())


### Select ufs.com registered operators

In [9]:
dfEmakina = spark.table('data_datascience_prod.operators').where(f.lower(f.col('sourceName')) == 'emakina').where(f.upper(f.col('countryCode')) == countryCode)
dfEmakina = dfEmakina.withColumnRenamed('ohubID','operatorOhubId')

print(dfEmakina.select('operatorOhubId').distinct().count())

### Select visited or called operators

In [11]:
dfVisitedOperators = spark.table('dev_sources_ohub.raw_activities').where(f.lower(f.col('actionType')).isin('salesvisit', 'telesalescall')).where(f.regexp_replace(f.substring(f.col('activityDate'),1,7),'-', '') >= month2).select('operatorOhubId', 'actionType','activityDate').where(f.col('countryCode')==countryCode)

print(dfVisitedOperators.agg(f.min('activityDate')).collect()[0][0], dfVisitedOperators.select('operatorOhubId').distinct().count())

In [12]:
display(dfVisitedOperators.select('actionType').distinct())

actionType
SalesVisit


### Create dataframe with 'Single Sign of Life' operators

In [14]:
dfActive = dfBuying.select('operatorOhubID').union(dfEmakina.select('operatorOhubID')).union(dfVisitedOperators.select('operatorOhubID'))

print(dfActive.select('operatorOhubId').distinct().count())

In [15]:
dfActiveOps = spark.table('data_datascience_prod.operators').select('countryCode', 'channel', 'name', 'zipCode', 'street', 'housenumber', 'houseNumberExtension', 'city', 'ohubID', 'concatId')
dfActiveOps = dfActiveOps.\
where(f.col('countryCode')==countryCode).\
where(f.regexp_replace(f.col('name'),'[^0-9a-zA-Z]', '').isNotNull()).\
where(f.regexp_replace(f.col('zipcode'),'[^0-9a-zA-Z]', '').isNotNull())

dfActiveOps = dfActiveOps.withColumnRenamed('ohubID','operatorOhubId')
dfActiveOps = dfActiveOps.withColumnRenamed('concatID','operatorConcatID')


dfActiveOps = dfActiveOps.join(dfActive, on = ['operatorOhubID'], how = 'left_semi')

w1 = Window\
.partitionBy(f.col('operatorOhubId'))\
.orderBy(f.col('operatorConcatID'))

dfActiveOps = dfActiveOps.withColumn('countryCode', f.first(f.col('countryCode')).over(w1))
dfActiveOps = dfActiveOps.withColumn('channel', f.first(f.col('channel')).over(w1))
dfActiveOps = dfActiveOps.withColumn('name', f.first(f.col('name')).over(w1))             
dfActiveOps = dfActiveOps.withColumn('zipCode', f.first(f.col('zipCode')).over(w1))
dfActiveOps = dfActiveOps.withColumn('street', f.first(f.col('street')).over(w1))
dfActiveOps = dfActiveOps.withColumn('housenumber', f.first(f.col('housenumber')).over(w1))
dfActiveOps = dfActiveOps.withColumn('city', f.first(f.col('city')).over(w1))
dfActiveOps = dfActiveOps.withColumn('city', f.initcap(f.col('city')))
dfActiveOps = dfActiveOps.withColumn('operatorConcatID', f.first(f.col('operatorConcatID')).over(w1))
dfActiveOps = dfActiveOps.withColumn('housenumber', f.first(f.col('housenumber')).over(w1))
dfActiveOps = dfActiveOps.withColumn('housenumber', f.first(f.col('housenumber')).over(w1))


### Test the active operator dataframe for the visited and called operators

In [17]:
dftest = spark.table('dev_sources_ohub.raw_activities').\
where(f.lower(f.col('actionType')).isin('salesvisit', 'telesalescall')).\
where(f.regexp_replace(f.substring(f.col('activityDate'),1,7),'-', '') >= month2).\
select('operatorOhubId', 'actionType','activityDate').where(f.col('countryCode')==countryCode).\
select('operatorOhubID').\
distinct()

w2 = Window\
.partitionBy(f.when(\
                    f.regexp_replace(f.col('name'),'[^0-9a-zA-Z]', '').isNotNull(),\
                    f.regexp_replace(f.col('name'),'[^0-9a-zA-Z]', ''))\
             .otherwise(''), \
             f.when(\
                    f.regexp_replace(f.col('zipcode'),'[^0-9a-zA-Z]', '').isNotNull(),\
                    f.regexp_replace(f.col('zipcode'),'[^0-9a-zA-Z]', '')).\
             otherwise(''))\
.orderBy(f.col('concatId'))

dftest = dftest.join(spark.table('data_datascience_prod.operators').\
where(f.col('countryCode')==countryCode).\
withColumn('rank', f.dense_rank().over(w2)).where(f.col('rank')==1).drop(f.col('rank')).
where(f.regexp_replace(f.col('name'),'[^0-9a-zA-Z]', '').isNotNull()).\
where(f.regexp_replace(f.col('zipcode'),'[^0-9a-zA-Z]', '').isNotNull()).\
                          select(f.col('ohubID').alias('operatorOhubID')),
                          on = ['operatorOhubID'],
                          how = 'left_semi')

print(dftest.select('operatorOhubID').distinct().count(),
     dftest.join(dfActiveOps, on = ['operatorOhubID'], how = 'left_semi').select('operatorOhubID').distinct().count())


### Test the active operator dataframe for the ufs.com registered operators

In [19]:
dfTest2 = spark.table('data_datascience_prod.operators').where(f.lower(f.col('sourceName')) == 'emakina').where(f.upper(f.col('countryCode')) == countryCode)
dfTest2 = dfTest2.withColumnRenamed('ohubID','operatorOhubId').select('operatorOhubId').distinct()

dfTest2 = dfTest2.join(spark.table('data_datascience_prod.operators').\
where(f.col('countryCode')==countryCode).\
where(f.regexp_replace(f.col('name'),'[^0-9a-zA-Z]', '').isNotNull()).\
where(f.regexp_replace(f.col('zipcode'),'[^0-9a-zA-Z]', '').isNotNull()).\
                          select(f.col('ohubID').alias('operatorOhubID')),
                          on = ['operatorOhubID'],
                          how = 'left_semi')

print(dfTest2.select('operatorOhubID').distinct().count(),
     dfTest2.join(dfActiveOps, on = ['operatorOhubID'], how = 'left_semi').select('operatorOhubID').distinct().count())


### Test the active operator dataframe for the buying operators

In [21]:
dfTest3 = spark.table('dev_derived_reco.sales_max_corrected_filtered').\
where(f.substring(f.col('transactionDateString'),1,6) >= month1).where(f.col('countryCode')==countryCode).\
select('operatorOhubId').distinct()

dfTest3 = dfTest3.join(spark.table('data_datascience_prod.operators').\
where(f.col('countryCode')==countryCode).\
where(f.regexp_replace(f.col('name'),'[^0-9a-zA-Z]', '').isNotNull()).\
where(f.regexp_replace(f.col('zipcode'),'[^0-9a-zA-Z]', '').isNotNull()).\
                          select(f.col('ohubID').alias('operatorOhubID')),
                          on = ['operatorOhubID'],
                          how = 'left_semi')

print(dfTest3.select('operatorOhubID').distinct().count(),
     dfTest3.join(dfActiveOps, on = ['operatorOhubID'], how = 'left_semi').select('operatorOhubID').distinct().count())


### deduplicate dataframe on Name x Zipcode level. Making name & zipcode alphanumerical

In [23]:
print(dfActiveOps.count(),
      dfActiveOps.select('operatorOhubId').distinct().count(),
      dfActiveOps.select('name', 'zipCode').distinct().count(),
      dfActiveOps.select('operatorConcatID').distinct().count())

In [24]:
w2 = Window\
.partitionBy(f.when(\
                    f.regexp_replace(f.col('name'),'[^0-9a-zA-Z]', '').isNotNull(),\
                    f.regexp_replace(f.col('name'),'[^0-9a-zA-Z]', ''))\
             .otherwise(''), \
             f.when(\
                    f.regexp_replace(f.col('zipcode'),'[^0-9a-zA-Z]', '').isNotNull(),\
                    f.regexp_replace(f.col('zipcode'),'[^0-9a-zA-Z]', '')).\
             otherwise(''))\
.orderBy(f.col('operatorConcatID'),
        f.when(f.col('street').isNotNull(),
              f.col('street')).\
        otherwise(''))


dfActiveOps = dfActiveOps.withColumn('rank', f.row_number().over(w2)).where(f.col('rank')==1).drop(f.col('rank')).distinct()

dfActiveOps.show()

In [25]:
print(dfActiveOps.count(),
      dfActiveOps.select('operatorOhubId').distinct().count(),
      dfActiveOps.select('name', 'zipCode').distinct().count(),
      dfActiveOps.select('operatorConcatID').distinct().count())
      

In [26]:
dfActiveOps.write.mode("overwrite").saveAsTable("data_user_hien.operators_ssol_"+str(countryCode))

In [27]:
%sql

Select count(*), count(distinct operatorOhubID), count(distinct name || ' / ' || zipcode)
From data_user_tim.operators_ssol_tr
;
