In [15]:
#libary & logging
from pyspark.sql import SparkSession
from functools import reduce
import logging
import sys
from logging.handlers import TimedRotatingFileHandler
import configparser

#config file that contains parameters such as dataset and countries to filter on
config = configparser.ConfigParser()
config.read('config.ini')

#logging formatting and filename


#arguments to rename and drop columns 
newColumns =['client_identifier','email', 'country', 'bitcoin_address', 'credit_card_type']
columns_to_drop=['first_name','last_name','cc_n']
listofcountries= ['United Kingdom'], ['Netherlands']
#logging args

def main():
    logger = get_logger("ABN exercise")
    #spark session
    spark = SparkSession \
        .builder \
        .appName("abn exercise") \
        .getOrCreate()
    logger.info("started Spark Session")
    
    df1 = spark.read.csv(config['datasets']['dataset_one'],header=True)
    df2 = spark.read.csv(config['datasets']['dataset_two'],header=True)
    
    logger.info("loaded datasets")
    
    data = df1.join(df2,'id')
    logger.info("joined datasets")

    
    droppeddata = data.drop(*columns_to_drop)
    logger.info("dropped columns")
    
    
    renameddata=renameColumns(droppeddata,newColumns)
    logger.info("renamed columns")
    
    #get list from config ini
    #filterlist=config['filters']['countries'].split(',')
    
    filtereddata=filterCountry(renameddata, listofcountries)
    #config['filters']['countries'].split(';')
    logger.info("filtered data by specified countries")
    
    filtereddata.write.csv('client_data',header='true', mode='overwrite')
    
    logger.info("writing to client_data folder")    
    
    return None
    
    
#filter dataset to countries supplied in arguments
def filterCountry(df,loc):
    #'''Takes dataframe and array of desired countries as input, returns filterd dataframe'''
    filteredresult=df.filter(df.country.isin(loc))
    return filteredresult


#rename columns
def renameColumns(df,newcol):
    #'''Takes dataframe and list of new columns as input, returns dataframe with renamed columns. Number of input / rename columns must be equal!'''
    oldColumns = df.schema.names
    renamedresult = reduce(lambda df, idx: df.withColumnRenamed(oldColumns[idx], newcol[idx]), range(len(oldColumns)), df)
    return renamedresult

#logging logic: 
#https://www.toptal.com/python/in-depth-python-logging
FORMATTER = logging.Formatter("%(asctime)s — %(name)s — %(levelname)s — %(message)s")

def get_console_handler():
   #'''Setup console handler and formatting'''
   console_handler = logging.StreamHandler(sys.stdout)
   console_handler.setFormatter(FORMATTER)
   return console_handler
def get_file_handler():
    #'''Setup log file handler and formatting'''
   file_handler = TimedRotatingFileHandler('abn.log', when='midnight')
   file_handler.setFormatter(FORMATTER)
   #file_handler.suffix="%Y%m%d%" 
   return file_handler
def get_logger(logger_name):
    #'''set log level, currently on DEBUG'''
   logger = logging.getLogger(logger_name)
   logger.setLevel(logging.DEBUG) # debug level logging
   logger.addHandler(get_console_handler())
   logger.addHandler(get_file_handler())
   # with this pattern, it's rarely necessary to propagate the error up to parent
   logger.propagate = False
   return logger


if __name__ == '__main__':
    main()
    
    

2021-05-31 11:52:22,185 — ABN exercise — INFO — started Spark Session
2021-05-31 11:52:22,185 — ABN exercise — INFO — started Spark Session
2021-05-31 11:52:22,185 — ABN exercise — INFO — started Spark Session
2021-05-31 11:52:22,185 — ABN exercise — INFO — started Spark Session
2021-05-31 11:52:22,185 — ABN exercise — INFO — started Spark Session
2021-05-31 11:52:22,185 — ABN exercise — INFO — started Spark Session
2021-05-31 11:52:22,185 — ABN exercise — INFO — started Spark Session
2021-05-31 11:52:22,185 — ABN exercise — INFO — started Spark Session
2021-05-31 11:52:22,185 — ABN exercise — INFO — started Spark Session
2021-05-31 11:52:22,185 — ABN exercise — INFO — started Spark Session
2021-05-31 11:52:22,185 — ABN exercise — INFO — started Spark Session
2021-05-31 11:52:22,185 — ABN exercise — INFO — started Spark Session
2021-05-31 11:52:22,185 — ABN exercise — INFO — started Spark Session
2021-05-31 11:52:22,185 — ABN exercise — INFO — started Spark Session
2021-05-31 11:52:22,

Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.functions.lit.
: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [[United Kingdom], [Netherlands]]
	at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:90)
	at org.apache.spark.sql.catalyst.expressions.Literal$.$anonfun$create$2(literals.scala:152)
	at scala.util.Failure.getOrElse(Try.scala:222)
	at org.apache.spark.sql.catalyst.expressions.Literal$.create(literals.scala:152)
	at org.apache.spark.sql.functions$.typedLit(functions.scala:131)
	at org.apache.spark.sql.functions$.lit(functions.scala:114)
	at org.apache.spark.sql.functions.lit(functions.scala)
	at jdk.internal.reflect.GeneratedMethodAccessor54.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
