In [None]:
# I used a mix of pyspark and pandas throughout this notebook, sometimes alternating
# to show the same command carried out both in pyspark and pandas, between different questions.
# I did this because I assumed that at some point, big data processes might become
# important for this analyst team at N26.

In [None]:
#1
# Identify all customers that we need to report as tax-liable 
# based on their personal data and the following criteria. 
# Only ONE of the following criteria needs to be met 
# for a customer to be included in the reported.

In [None]:
# All customers with tax IDs in Germany, France, Italy or Spain.
### tax_id file. 'country' variable.

# All customers that live in Germany, France, Italy or Spain.
### customer_data file. 'country' variable

# All customers that have phone numbers of Germany, France, Italy or Spain.
### customer_data file. 'mobile_phone_no' variable (respective area codes: 49, 33, 39, 34)

In [None]:
# Because customers only have to fit one of these critera, I broke the selection process
# into 3 steps, that I then merged together again at the end.

In [None]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

In [None]:
# Initiate Spark session

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd

conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [None]:
# Load the customer data

In [None]:
customer = (spark.read.format("csv").options(header="true")
    .load("customer_data.csv"))

In [None]:
# Load the tax data

In [None]:
taxes = (spark.read.format("csv").options(header="true")
    .load("tax_ids.csv"))

In [None]:
# Make the user_id naming consistent across files.
customer = customer.withColumnRenamed('id','user_id')

In [None]:
# Identify all customers who live in the 4 countries, from original customer file.
customer_country = customer[customer.country.isin("Germany", "France","Italy", "Spain")].collect()

In [None]:
spark_cust_country = spark.createDataFrame(customer_country)

In [None]:
spark_cust_country.first()

In [None]:
from pyspark.sql.functions import col, lit
from functools import reduce

In [None]:
# Identify all customers who have phone numbers from the 4 countries, from original customer file.
phone = customer.filter("mobile_phone_no like '+49%' OR mobile_phone_no like '+33%' OR mobile_phone_no like '+39%' OR mobile_phone_no like '+34%'")

In [None]:
# Convert the reduced data set to pandas. Going spark to pandas can be useful for very big data sets.

In [None]:
pd_phone = phone.toPandas()

In [None]:
pd_country = spark_cust_country.toPandas()

In [None]:
# Identify all customers who have tax IDs in the 4 countries.
tax_country = taxes[taxes.country.isin("Germany", "France","Italy", "Spain")].collect()

In [None]:
spark_tax = spark.createDataFrame(tax_country)

In [None]:
pd_tax = spark_tax.toPandas()

In [None]:
# Create a dictionary
# Create a for loop that goes through each row and makes the user_id the key
# There can only be one key, so this effectively collects all the country and tax_id
# information for each user_id. This way, we don't lose duplicates.
taxdict = {}
for x in range(len(pd_tax)):
    currentid = pd_tax.iloc[x,0]
    currentvalue = pd_tax.iloc[x,1]
    currentvalue2 = pd_tax.iloc[x,2]
    taxdict.setdefault(currentid, [])
    taxdict[currentid].append({'country' : currentvalue,'id' : currentvalue2})

In [None]:
# Turn the dictionary into a list, with both key and values together
list_key_value = [ [k,v] for k, v in taxdict.items() ]

In [None]:
# Check a specific observation, previously identified as having 2 tax IDs.
list_key_value[11]

In [None]:
# Create the tax_information variable shown in the .json file.
back_to_pd_tax = pd.DataFrame(list_key_value, columns = ['user_id', 'tax_information'])

In [None]:
# Merge the tax with the meta customer file, to fill in blanks for the tax customers.
# Assumption: null values for tax_information (as opposed to empty lists) are acceptable.

In [None]:
pd_customer = customer.toPandas()

In [None]:
meta_merge = pd.merge(pd_customer, back_to_pd_tax, how='outer', on='user_id')

In [None]:
meta_merge.info()

In [None]:
meta_merge = meta_merge.dropna(subset=['tax_information'])

In [None]:
meta_merge.info()

In [None]:
# Merge the customer data for phone and country.
first_merge = pd.merge(pd_phone, pd_country, how='outer', on=['user_id', 'country','iban',
            'account_opened', 'mobile_phone_no', 'account_closed'])

In [None]:
# Merge the customer data with the tax data
question_one_final = pd.merge(first_merge, meta_merge, how='outer', on=['user_id', 'country','iban',
            'account_opened', 'mobile_phone_no', 'account_closed'])

In [None]:
question_one_final.to_csv(r"D:\School\CVs\Germany\N26\case_reporting_data_analyst\question_one_final.csv")

In [None]:
###########################################

In [None]:
#2
# Identify all customers that we need to report as tax-liable 
# based on their transaction data. 
# Any customer with at least one transaction that 
# fulfills ALL of the following criteria is tax-liable.

In [None]:
# Transaction in 2018.
### Transaction data file. 'date' variable.

#Transaction type is a direct debit.
### Transaction data file. 'type' variable.

#Recipient IBAN is in Germany, France, Italy or Spain. = transactions
### Transaction data file. 'counterparty' variable.

In [None]:
# Since customer have to fit all 3 criteria, I wittled down the file step by step.

In [None]:
import datetime as dt
from datetime import date

In [None]:
# Read file directly as pandas dataframe

In [None]:
pd_transaction = pd.read_csv('transactions.csv')

In [None]:
pd_transaction.info()

In [None]:
# Turn the 'date' variable into a datetime variable.
pd_transaction['date'] = pd.to_datetime(pd_transaction['date'])

In [None]:
# Setting the start and end dates of the year, 2018.
start_date = "2018-01-01"
end_date = "2018-12-31"

In [None]:
# Creating a mask to filter out the values that are not from 2018 (none)

In [None]:
mask = (pd_transaction['date'] >= start_date) & (pd_transaction['date'] <= end_date )

In [None]:
# Selecting only dates from 2018
pd_transaction = pd_transaction.loc[mask]

In [None]:
pd_transaction.info()

In [None]:
# Creating a new file that has only direct debit transactions. From working a lot with SAS, I tend to create extra
# data frames, for backup/double checking.
pd_deb_clear = pd_transaction[pd_transaction.type == 'direct_debit']

In [None]:
# Using a pandas way, as opposed to the previously used pyspark way, to select
# observations from the 4 countries.
question_two_penul= pd_deb_clear[pd_deb_clear['counterparty'].str.startswith(('FR', 'IT', 'ES', 'DE'))]

In [None]:
# Copying only the user column because we don't need the other columns

In [None]:
two_cleaning = question_two_penul['user_id'].copy()

In [None]:
# Only selecting one column meant that the results became a series. Thus I convert it back to a df

In [None]:
two_cleaning = two_cleaning.to_frame()

In [None]:
# and add a column that indicates that all 3 criteria were met. This makes it easier for me to keep track
# of these observations

In [None]:
two_cleaning['criteria_met']=1

In [None]:
# dropping any duplicates. This is safe because we only need to report customers once.

In [None]:
two_cleaning = two_cleaning.drop_duplicates()

In [None]:
# Because the transaction data didn't have any IBAN numbers, I load the customer file and merge it with
# the reduced transaction file (i.e., criteria_met file)

In [None]:
iban = pd.read_csv('customer_data.csv')

In [None]:
iban = iban.rename(columns={"id": "user_id"})

In [None]:
# Merge on inner, because we only want matching records

In [None]:
iban_fill = pd.merge(two_cleaning, iban, how='inner', on='user_id')

In [None]:
# Merging the end results of question/task 1 and 2. Outer merge because we want all records from both files.
# Merging on many variables so as to not end up with duplicate columns.

In [None]:
question_one_two = pd.merge(question_one_final, iban_fill, how='outer', on=['user_id', 'country','iban',
            'account_opened', 'mobile_phone_no', 'account_closed'])

In [None]:
#3
# Report only customers that had an active account in 2018 
#(had an open account on at least one day in 2018)
### customer data file. 'account_opened' and 'account_closed' variables.

In [None]:
pd_customers = question_one_two

In [None]:
# Turn the 'account_opened' and 'account_closed' variables into datetime variables.
pd_customers['account_opened'] = pd.to_datetime(pd_customers['account_opened'])

In [None]:
pd_customers['account_closed'] = pd.to_datetime(pd_customers['account_closed'])

In [None]:
# Set start and stop dates for opening and closing of accounts
# Remove anyone who opened an account after the last day of 2018
# Removed anyone who closed their account before the first day of 2018

In [None]:
opened = "2018-12-31"
closed = "2018-01-01"

In [None]:
pd_customers = pd_customers.drop(pd_customers[(pd_customers.account_opened > opened)].index)

In [None]:
pd_customers = pd_customers.drop(pd_customers[(pd_customers.account_closed < closed)].index)

In [None]:
report = pd_customers.copy()

In [None]:
#Create a table of all users identified based on 1), 2) and 3) and ensure that we 
#do not double-report any customer. We need to report 
#a customer’s IBAN, account opening, account closure dates and, 
#if available their tax country and IDs. 
#Be aware that we do not have tax IDs for all customers. 
#In that scenario, it suffices that we report IBAN, account opening and 
#closure dates as well as an empty tax information line.

In [None]:
# Drop columns that we don't want.

In [None]:
report.drop(["user_id","country","mobile_phone_no", "criteria_met"], axis=1, inplace=True)

In [None]:
# Rename columns to match example file.

In [None]:
report = report.rename(columns={"account_opened": "opened", "account_closed": "closed"})

In [None]:
# Date time objects need to be converted to string before we convert to json

In [None]:
report.info()

In [None]:
# Also, the format had to be changed. This does both, for the 'opened' variable.

In [None]:
report['opened'] = report['opened'].apply(lambda x: x.strftime('%Y-%m-%d'))

In [None]:
report.info()

In [None]:
# This does the same for the 'closed' variable

In [None]:
report['closed'] = report['closed'].dt.strftime('%Y-%m-%d')

In [None]:
# Replacing the "NaT" values with "null" values.

In [None]:
report['closed'] = report['closed'].apply(lambda x : "null" if x=="NaT" else x)

In [None]:
#Save the users as a JSON file with the layout as outlined in example_output.json.

In [None]:
report.to_json(r'D:\Germany\N26\case_reporting_data_analyst\report_final.json', orient='records')

In [None]:
#import json

In [None]:
#df_json_pretty = json.dumps(json.loads(report.to_json()), indent=3)

In [None]:
sc.stop()