In [1]:
from pyspark.sql.functions import isnan, when, count, col, split, udf, lower
from pyspark.sql.functions import ltrim, rtrim
from pyspark.sql.types import IntegerType, StringType
from urlparse import urlparse

In [2]:
def create_dataframe_with_filter(files_path, column, value_filter_list):
  """Create subset dataframe with filter on column 'column' where value = 'value_filter_list'
  """
  # create list of filenames
  filenames = []
  for file in dbutils.fs.ls(files_path):
    if file[0].find('part-')> -1:
      filenames.append(file[0])
  print "Number of partition files: ", len(filenames)
  print "\n".join(filenames)  # file list "pretty" print
  # loop through partition files to create new dataframe
  df = spark.read.parquet(filenames[0]).where(col(column).isin(value_filter_list)).cache() 
  print("First partition size: ", df.count())
  for file in filenames[1:]:  # iterate from 2nd file
    print("Adding ", file, "...")
    df_temp = spark.read.parquet(file).where(col(column).isin(value_filter_list))
    print("Number of rows added: ", df_temp.count())
    df = df.union(df_temp).cache()  # believe it's better to cache since will reuse
  print("Final dataframe size: ", df.count())

In [3]:
# Impressions/ Clicks merge function

def merge_imp_clicks(impressions, clicks):
  clicks = clicks.select('impId').dropDuplicates(subset = ['impId']).withColumnRenamed('impId','clicked')
  return(impressions.dropDuplicates(subset = ["auctionId"]).join(clicks,
                          [impressions.auctionId == clicks.clicked], 'left').withColumn('clicked', clicks.clicked.isNotNull().cast('double')))

In [4]:
# Impressions/ Clicks merge function
from pyspark.sql.functions import broadcast

def merge_imp_clicks2(impressions, clicks):
  clicks = clicks.select('impId').dropDuplicates(subset = ['impId']).withColumnRenamed('impId','clicked')
  return(impressions.dropDuplicates(subset = ["auctionId"]).join(broadcast(clicks),
                          [impressions.auctionId == clicks.clicked], 'left').withColumn('clicked', clicks.clicked.isNotNull().cast('double')))

In [5]:
#Chung Meng
# Print Table of Column Missingness
def view_column_missingness(df):
  display(df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]))

# Print Structure of Data Frame with Custom Message 
def print_struct_df (df,msg='Data Frame Structure'):
  print(msg)
  print(df.count(),len(df.columns))
  print(df.columns)

In [6]:
#Chung Meng
# adSize => adWidth, adHeight
# adArea = adWidth * adHeight 
def cleanup_adSize(df):
  split_col = split(df['adSize'], 'x')
  df = df.withColumn('adWidth', split_col.getItem(0).cast(IntegerType()))
  df = df.withColumn('adHeight', split_col.getItem(1).cast(IntegerType()))
  df = df.withColumn('adArea',df.adWidth*df.adHeight)
  df = df.drop('adHeight','adWidth')
  return(df)
  #df.show(5)
  #df.printSchema()
  

In [7]:
# Chung Meng
# ageGroup as Categorical Feature 
def ageToGroup(num):
    if num is None:
      return('none')    #Age is NA : None
    elif num > 55:
        return "senior" #Age > 55 : Senior
    elif num > 35:
        return "middle" # 55 > Age > 35 : Middle 
    elif num > 20:
        return "young"  # 35 > Age > 20 : Young
    elif num > 11:
        return "teen"   # 20 > Age > 11 : Teen
    elif num > 0:
        return "kid"    # Age < 11 : Kid
    else:
        return "none"
      
      
def cleanup_age_category(df):
  # Create a Column expression representing a UDF.
  udfAgeGroup=udf(ageToGroup, StringType())

  # apply UDF to age column
  df=df.withColumn("ageGroup", udfAgeGroup("age"))
  df=df.drop('age')
# df.show(10)
  return(df)

In [8]:
#Chung Meng
def cleanup_age_numeric(df):
  df=df.na.fill({'age': -999})
  return(df.withColumn("age", df['age'].astype("int")))

In [9]:
def cleanup_gender(df):
  df=df.na.fill({'gender': u'U'})
  return(df)

In [10]:
# Chung Meng: OS Cleanup 
#def cleanup_os(df):
#  df=df.na.fill({'os': u'unknown'})
#  return(df)

# ageGroup as Categorical Feature 
def osToGroup(ostype):
    if ostype is None:
      return('unknown')
    elif (ostype=='android')|(ostype=='ios'): #android/os unchanged
      return(ostype)
    elif ostype.find('window'): #windows & windows phone os => windows
      return('windows')
    else:
      return("others")  #the rest => others
      
def cleanup_os(df):
  # Create a Column expression representing a UDF.
  udfOSGroup=udf(osToGroup, StringType())

  # apply UDF to age column
  df=df.withColumn("os", udfOSGroup("os"))
  return(df)

In [11]:
from pyspark.sql.functions import concat
from pyspark.sql.types import StringType

def create_interaction(col1, col2):
  return(concat(col1, lit("-"), col2))
      
udfinteraction = udf(create_interaction, StringType())

def interaction(df, col1, col2, new_col):
  # create interation
  return(df.withColumn(new_col, create_interaction(col1, col2)))

In [12]:
def dedupe(df, subset):
  return df.dropDuplicates(subset)

In [13]:
def plot_evaluation_curve(models, labels, data, curve_type='roc', styles = ['b--','g--','r--','c--','m--','y--','k--','w--']):
  '''
  use to plot either receiver operating characteristic (roc) or precision-recall (pr) curve
  models: list of models
  labels: names of the models - used by plot legend
  '''
  import matplotlib.pyplot as plt
  fig, ax = plt.subplots()
  ax.set_xlim((0.0, 1.0))
  ax.set_ylim((0.0, 1.0))

  for i in range(len(models)):
    model_fe = models[i].copy()
    model_fe.stages = model_fe.stages[:-1]
    modelSummary = models[i].stages[-1].evaluate(model_fe.transform(data))

    if curve_type=='roc':
      ax.plot(modelSummary.roc.toPandas().iloc[:,0], modelSummary.roc.toPandas().iloc[:,1], styles[i], label=labels[i])
      ax.set_xlabel('FPR')    
      ax.set_ylabel('TPR')   
    else:
      ax.plot(modelSummary.pr.toPandas().iloc[:,0], modelSummary.pr.toPandas().iloc[:,1], styles[i], label=labels[i])
      ax.set_xlabel('Recall')
      ax.set_ylabel('Precision')
  ax.legend(loc='best', shadow=True)
  display(fig)

In [14]:
def take_a_sample(root_path = '/mnt/sito/capstone/2017-11/september/', child_path = './impressions/', start_row = 3, sample_ratio = 0.1):
  '''
  cycle through and take random sample from each parquet
  '''
  import os
  file_infos = dbutils.fs.ls(os.path.join(root_path, child_path))[start_row:]
  s = None
  for file_info in file_infos:
    if s is None:
      s = spark.read.parquet(file_info.path).sample(False, sample_ratio, seed=0)
    else:
      s = s.union(spark.read.parquet(file_info.path).sample(False, sample_ratio, seed=0))
  return s

In [15]:
def cleanup_country(df):
  df = df.filter(lower(df.country) == 'usa')
  return df.drop('country')

In [16]:
def cleanup_timestamp(df):
  from pyspark.sql.types import IntegerType
  from pyspark.sql.functions import hour
  weekday = udf(lambda dt: dt.weekday(), IntegerType())
  df = df.withColumn('timestamp', df.timestamp.cast('timestamp'))
  df = df.withColumn('timestamp_hour', hour(df.timestamp).cast('string')) \
         .withColumn('timestamp_weekday', weekday(df.timestamp).cast('string')) \
         .drop('timestamp')
  return df

In [17]:
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf, col, lit
from pyspark.sql.types import ArrayType, StringType
from pyspark import keyword_only  # This fails: from pyspark.ml.util import keyword_only. See https://github.com/maxpumperla/elephas/issues/60

class iabCategoriesTransformer(Transformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
      super(iabCategoriesTransformer, self).__init__() 
      kwargs = self._input_kwargs  # kwargs = self.__init__._input_kwargs
      self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs  # kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
      out_col = ["IAB"+str(i) for i in range (1, 27)]  # self.getOutputCol()
      in_col = dataset[self.getInputCol()]
            
      # Create udf functions
      def read_cat(newcolName, iabCategoriescol):
        """ Function to indicate if the column value is in the list of categories in original 'iabCategories' column
        """
        try:
          return(int(newcolName in iabCategoriescol))
        except:
          return(0)  # catch argument of type 'NoneType' is not iterable

      def remove_subcat(iabCategoriescol):
        """ Function to remove sub-categories from original 'iabCategories' column
        """
        try:
          return(list(map(lambda x: x.split("-")[0], iabCategoriescol)))
        except:
          return(None)  # catch error argument 2 to map() must support iteration
      
      udf_read_cat = udf(read_cat, IntegerType())
      udf_remove_subcat = udf(remove_subcat, ArrayType(StringType()))  
      
      # remove sub-categories from iabCategories column
      # ._jc.toString().encode('utf8') to get column name
      in_col_name = in_col._jc.toString().encode('utf8')
      dataset = dataset.withColumn(in_col_name, udf_remove_subcat(in_col_name)) 
      for i in range(1, 27):
        # create 26 'IAB' columns with their name in all values (to use in udf)
        dataset = dataset.withColumn(out_col[i-1], lit(out_col[i-1]))
        # use new column value to match in original column array of categories
        dataset = dataset.withColumn(out_col[i-1], udf_read_cat(out_col[i-1], in_col_name)) 
        # return dataframe without original column
      return(dataset.drop(in_col_name))
    

In [18]:
from pyspark.sql.functions import udf, col, lit
from pyspark.sql.types import ArrayType, StringType, IntegerType

def read_cat(newcolName, iabCategoriescol):
        """ Function to indicate if the column value is in the list of categories in original 'iabCategories' column
        """
        try:
          return(int(newcolName in iabCategoriescol))
        except:
          return(0)  # catch argument of type 'NoneType' is not iterable

def remove_subcat(iabCategoriescol):
  """ Function to remove sub-categories from original 'iabCategories' column
  """
  try:
    return(list(map(lambda x: x.split("-")[0], iabCategoriescol)))
  except:
    return(None)  # catch error argument 2 to map() must support iteration

udf_read_cat = udf(read_cat, IntegerType())
udf_remove_subcat = udf(remove_subcat, ArrayType(StringType()))  
      
  
def iab_encoder(dataset, in_col):
  out_col = ["IAB"+str(i) for i in range (1, 27)]
  # remove sub-categories from iabCategories column
  # ._jc.toString().encode('utf8') to get column name
  dataset = dataset.withColumn(in_col, udf_remove_subcat(in_col)) 
  for i in range(1, 27):
    # create 26 'IAB' columns with their name in all values (to use in udf)
    dataset = dataset.withColumn(out_col[i-1], lit(out_col[i-1]))
    # use new column value to match in original column array of categories
    dataset = dataset.withColumn(out_col[i-1], udf_read_cat(out_col[i-1], in_col)) 
    # return dataframe without original column
  return(dataset.drop(in_col))

In [19]:
def bidReq_select(df):
  
  return df.select(
    df["timestamp"].alias("bid_tstamp"),
    df["bidRequest"]["id"].alias("bid_id"),
    df["bidRequest"]["app"]["bundle"].alias("bid_app_bundle"), 
    df["bidRequest"]["app"]["domain"].alias("bid_app_domain"), 
    df["bidRequest"]["app"]["publisher"]["name"].alias("bid_app_publisher_name"), 
    df["bidRequest"]["bcat"].alias("bid_bcat"),  
    df["bidRequest"]["device"]["geo"]["city"].alias("bid_city"), 
    df["bidRequest"]["device"]["geo"]["metro"].alias("bid_metro_goog"), 
    df["bidRequest"]["device"]["geo"]["type"].alias("bid_geo_type"),
    df["bidRequest"]["device"]["h"].alias("bid_device_height"), 
    df["bidRequest"]["device"]["w"].alias("bid_device_width"), 
    df["bidRequest"]["device"]["lmt"].alias("bid_device_lmt"), 
    df["bidRequest"]["device"]["make"].alias("bid_device_make"), 
    df["bidRequest"]["device"]["model"].alias("bid_device_model"), 
    df["bidRequest"]["device"]["osv"].alias("bid_device_osv"),
    df["bidRequest"]["imp"][0]["secure"].alias("bid_imp_secure"),
    df["bidRequest"]["imp"][0]["instl"].alias("bid_imp_interstitial"),
    df["bidRequest"]["imp"][0]["bidfloor"].alias("bid_bidfloor"),
    df["bidRequest"]["imp"][0]["video"]["pos"].alias("bid_vid_pos"),
    df["bidRequest"]["imp"][0]["banner"]["pos"].alias("bid_ban_pos")
  )

def merge_imps_and_bids(imps, bids):
  bids = bidReq_select(bids).withColumnRenamed('timestamp','bid_tstamp')
  return imps.join(bids, imps["auctionId"] == bids["bid_id"], 'left') #drop duplicates after. Suggested subset = ["auctionId"]

In [20]:
def column_lower_strip(df,colname):
  return(df.withColumn(colname, lower(col(colname))).withColumn(colname, ltrim(col(colname))).withColumn(colname, rtrim(col(colname))))

In [21]:
from pyspark.sql.functions import when

def format_region(impressions):
  impressions=column_lower_strip(impressions,'region')
  return (impressions.withColumn(
    'region',
    when(impressions['region'] == '', "blank_")
    .when(impressions['region']=='pennsylvania', 'pa')
    .when(impressions['region']=='illinois', 'il')
    .when(impressions['region']=='new york', 'ny')
    .when(impressions['region']=='iowa', 'ia')
    .when(impressions['region']=='nebraska', 'ne')
    .when(impressions['region']=='washington', 'wa')
    .when(impressions['region']=='new jersey', 'nj')
    .when(impressions['region']=='california', 'ca')
    .when(impressions['region']=='delaware', 'de')
    .when(impressions['region']=='georgia', 'ga')
    .when(impressions['region']=='ohio', 'oh')
    .when(impressions['region']=='texas', 'tx')
    .when(impressions['region']=='florida', 'fl')
    .when(impressions['region']=='idaho', 'id')
    .when(impressions['region']=='wyoming', 'wy')
    .when(impressions['region']=='maryland', 'md')
    .when(impressions['region']=='nevada', 'nv')
    .when(impressions['region']=='alaska', 'ak')
    .when(impressions['region']=='kentucky', 'ky')
    .when(impressions['region']=='minnesota', 'mn')
    .when(impressions['region']=='michigan', 'mi')
    .when(impressions['region']=='north carolina', 'nc')
    .when(impressions['region']=='virginia', 'va')
    .when(impressions['region']=='indiana', 'in')
    .when(impressions['region']=='arizona', 'az')
    .when(impressions['region']=='maine', 'me')
    .when(impressions['region']=='montana', 'mt')
    .when(impressions['region']=='louisiana', 'la')
    .when(impressions['region']=='massachusetts', 'ma')
    .when(impressions['region']=='tennessee', 'tn')
    .when(impressions['region']=='alabama', 'al')
    .when(impressions['region']=='arkansas', 'ar')
    .when(impressions['region']=='new hampshire', 'nh')
    .when(impressions['region']=='new mexico', 'nm')
    .when(impressions['region']=='wisconsin', 'wi')
    .when(impressions['region']=='connecticut', 'ct')
    .when(impressions['region']=='missouri', 'mo')
    .when(impressions['region']=='mississippi', 'ms')
    .when(impressions['region']=='colorado', 'co')
    .when(impressions['region']=='district of columbia', 'dc')
    .when(impressions['region']=='hawaii', 'hi')
    .when(impressions['region']=='kansas', 'ks')
    .when(impressions['region']=='oklahoma', 'ok')
    .otherwise(impressions['region'])
  ).na.fill('-999', subset = ['region']) 
         )  #.groupby('region').count().orderBy('count', ascending=False) #for count by region

In [22]:
import pyspark.sql.functions as func
def reduce_cardinality(df, colname, limit, mode=0):
  """Reduces cardinality for reduce_cardinality(df, cols, mode, limit)
  Minority Groups will be coalesced to 'other' Group
  df = dataframe, 
  cols = column name string or list of strings
  When mode==0: 
    limit= Total Number of Distinct Groups
  When mode==1:
    limit = Minimum Frequency/Count to remain as Original Group
  sub_groups = number of subgroups
  """
  #colname='region'
  groups=df.groupBy(colname).agg(func.count(colname).alias('Count')).sort('Count')
  if(mode==0):
    minor_groups=groups.limit(groups.count()-limit)
  else:
    minor_groups=groups.filter(groups.Count<limit)
    
  minor_list=minor_groups.select(col(colname).alias("name")).collect()
  
  def coalesce_residuals(grpname):
    for i in range(len(minor_list)):
      if grpname == minor_list[i].name:
        return('others')
    return(grpname)
  udfGroup = udf(coalesce_residuals, StringType())
  #bc = pricerite_locations
  df=df.withColumn(colname, udfGroup(colname))
  return(df)

In [23]:
def clean_app(x):
  stopwords = ['ios','android','320','50','250','a','the',"a", "an", "as", "i", "we", "app"]
  try:
    return([word for word in x\
            .replace('-', ' ').replace('_', ' ').replace('x', ' ').split()\
            if ((word.lower() not in stopwords) & (len(word) > 2))][0]) # return first non-stop word
  except:
    return(x)

def clean_bestVenueName(df): # TODO match with app name from apps
  udfclean_bestVenueName = udf(clean_app, StringType())
  return(df.withColumn('bestVenueName', udfclean_bestVenueName(df.bestVenueName)))

In [24]:
def clean_landingPage(df):
  getdomain = udf(lambda r: urlparse(r).netloc.replace('www.',''), StringType())
  return(df.withColumn('landingPage', getdomain(df.landingPage)))

In [25]:
from pyspark.mllib.clustering import KMeans
from pyspark.sql.types import *
import pandas as pd
import numpy as np
import random
def add_kmcluster(impressions):
  impressions=impressions.withColumn('lat',col('location')[0]).withColumn('lon',col('location')[1])
  location_pd=impressions.select('auctionId','dimensions','lat','lon').toPandas()
  #Random Geo Imputation within US
  US_lat_north=46
  US_lat_south=35
  US_lon_west=-116
  US_lon_east=-93
  NA_lat=location_pd.lat.isnull()
  NA_lon=location_pd.lon.isnull()
  location_pd.loc[NA_lat,'lat'] = location_pd.loc[NA_lat,'lat'].apply(lambda v: random.choice(np.arange(US_lat_south,US_lat_north,0.05)))
  location_pd.loc[NA_lon,'lon'] = location_pd.loc[NA_lon,'lon'].apply(lambda v: random.choice(np.arange(US_lon_west,US_lon_east,0.05)))
  #Save Coordinates as Matrix
  coords = location_pd.as_matrix(columns=['lat', 'lon'])
  #Run KMeans
  coords_rdd=sc.parallelize(coords)
  model = KMeans.train(coords_rdd, 500, maxIterations=40, initializationMode="random", seed=50, initializationSteps=5, epsilon=1e-4)
  #Cluster Prediction & Labels 
  cluster_labels=model.predict(coords_rdd).collect()
  num_clusters=model.k
  message = 'Clustered %d points down to %d clusters, for %f compression'
  print(message%(len(coords), num_clusters, 100*(1 - float(num_clusters) / len(location_pd))))
  #Append Clusters to Location DFrame
  location_pd['cluster']=cluster_labels
  pdSchema = StructType([StructField("auctionId2", StringType(), True),
                         StructField("dimensions2", StringType(), True),
                         StructField("lat2", DoubleType(), True),
                           StructField("lon2", DoubleType(), True),
                           StructField("cluster", StringType(), True)]) 
  print(pdSchema)
  location_cluster=spark.createDataFrame(location_pd,pdSchema)
  #Join Location & Impressions DFrame
  impressions=impressions.join(location_cluster,
                                   (impressions.lat==location_cluster.lat2)&(impressions.lon==location_cluster.lon2)&
                                   (impressions.auctionId==location_cluster.auctionId2)&(impressions.dimensions==location_cluster.dimensions2),"inner").dropDuplicates()
  return(impressions.drop('lat2','lon2','dimensions2','auctionId2'))

In [26]:
import math
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

logloss = udf(lambda l,p:-l*math.log(float(p[1]))-(1-l)*math.log(float(p[0])),FloatType())

In [27]:
#functions that return features and feature names lists
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexerModel
#TODO: import stringindexer and vectorassembler
def getFeatureCols(pipelinemodel):
  for stage in pipelinemodel.stages:
    if type(stage) == VectorAssembler:
      return stage.getInputCols()
  return ValueError("no vector assembler")

def getFeatureNames(pipelinemodel):
  '''
  returns a generator with a list for each feature name ordered by the index (most common to least per the StringIndexer transformation)
  '''
  
  for stage in pipelinemodel.stages:
    if type(stage) == StringIndexerModel:
      yield stage.labels
  pass

In [28]:
print("Helper functions loaded")