In [303]:
import pandas as pd
import numpy as np

In [304]:
# Import the get_engine function from our sql_functions.
from sql_functions import get_engine #adjust this as necessary to match your sql_functions.py connection methods

In [305]:
# Set paths to csv files
accounts = './data/accounts.csv'
playbacks = './data/playbacks.csv'
subscriptions = './data/subscriptions.csv'
vouchers = './data/promo_vouchers.csv'

# ACCOUNTS

In [306]:
# Read accounts (21.10.2020 - 01.10.2022)
df_accounts = pd.read_csv(accounts)
# set column names to lowercase
df_accounts.columns = df_accounts.columns.str.lower()

In [307]:
# Changing datatype to datetime
df_accounts['registration_date'] = pd.to_datetime(df_accounts['registration_date'])
df_accounts['lastlogin_date'] = pd.to_datetime(df_accounts['lastlogin_date'])

## Postal Code

In [308]:
# Remove non numeric characters
df_accounts['postal_code_clean'] = df_accounts['postal_code'].str.replace('-', '')
df_accounts['postal_code_clean'] = df_accounts['postal_code_clean'].str.extract('(\d+)')
# Fill null-values with 0
df_accounts['postal_code_clean'].fillna(0, inplace=True)

In [309]:
# Change data type to integer
df_accounts['postal_code_clean'] = df_accounts['postal_code_clean'].astype(int)

## City

### Mapping plz_files to accounts table for further geographical information

In [310]:
# Set file paths
plz_ch = './data/plz_verzeichnis_ch.csv'
plz_kanton = './data/plz_kantone_ch.csv'
plz_de = './data/plz_verzeichnis_de.csv'
plz_at = './data/plz_verzeichnis_at.csv'
# Read csv files
df_plz_ch = pd.read_csv(plz_ch, sep=';')
df_plz_kanton = pd.read_csv(plz_kanton, sep=';')
df_plz_de = pd.read_csv(plz_de, sep=',')
df_plz_at = pd.read_csv(plz_at, sep=';')
# Set column names to lowercase
df_plz_ch.columns = df_plz_ch.columns.str.lower()
df_plz_kanton.columns = df_plz_kanton.columns.str.lower()
df_plz_de.columns = df_plz_de.columns.str.lower()
df_plz_at.columns = df_plz_at.columns.str.lower()


In [311]:
# Clean plz_kanton
# Only keep relevant columns, rename
df_plz_kanton = df_plz_kanton[['postleitzahl / code postal / codice postale', 'ort / ville / città', 'kanton']]
df_plz_kanton.rename(columns = {'postleitzahl / code postal / codice postale':'postal_code', 'ort / ville / città':'city', 'kanton':'state'}, inplace = True)
df_plz_kanton.drop_duplicates(inplace = True)
# Add country_code for differentiation
df_plz_kanton['country_code'] = 'CH'

In [312]:
# Clean plz_de
# Only keep relevant columns, rename, drop duplicates
df_plz_de = df_plz_de[['plz', 'ort', 'bundesland']]
df_plz_de.rename(columns = {'plz':'postal_code', 'ort':'city', 'bundesland':'state'}, inplace = True)
df_plz_de.drop_duplicates(inplace = True)
# Add country_code for differentiation
df_plz_de['country_code'] = 'DE'

In [313]:
# Clean plz_at
# Only keep relevant columns, rename, drop duplicates
df_plz_at = df_plz_at[['plz', 'ort', 'bundesland']]
df_plz_at.rename(columns = {'plz':'postal_code', 'ort':'city', 'bundesland':'state'}, inplace = True)
df_plz_at.drop_duplicates(inplace = True)
# Add country_code for differentiation
df_plz_at['country_code'] = 'AT'

In [None]:
# Unify for merging, check shape
df_plz_all = pd.concat([df_plz_kanton, df_plz_de, df_plz_at])
# Dropping plz duplicates with multiple city, keeping the first entry each
df_plz_all = df_plz_all.groupby(['postal_code'])['city', 'state', 'country_code'].first().reset_index()

In [315]:
# Merge city and state information to accounts table on plz and country code
df_accounts = pd.merge(df_accounts, df_plz_all, left_on=['postal_code_clean', 'country_code'], right_on=['postal_code', 'country_code'], how='left')

In [None]:
# Clean plz_a for language information
# Only keep relevant columns, rename, drop duplicates
df_plz_ch = df_plz_ch[['postleitzahl', 'sprachcode']]
df_plz_ch.rename(columns = {'postleitzahl':'postal_code'}, inplace = True)
# Add country_code for differentiation
df_plz_ch['country_code'] = 'CH'
# Drop duplicates for dual-language-cities and keep first entry
df_plz_ch = df_plz_ch.groupby(['postal_code'])['sprachcode', 'country_code'].first().reset_index()

'''
Mapping of the language code:
#1 = German  
#2 = French  
#3 = Italian 
'''

In [317]:
# Merge language code to accounts table
df_accounts = pd.merge(df_accounts, df_plz_ch, left_on=['postal_code_clean', 'country_code'], right_on=['postal_code', 'country_code'], how='left')

In [318]:
# Drop duplicate postal code columns
df_accounts = df_accounts.drop(['postal_code_x', 'postal_code_y', 'postal_code'], axis=1)
# Rename original postal code column
df_accounts.rename(columns = {'postal_code_x':'postal_code_original', 'city_x':'city_original', 'city_y':'city_clean', 'sprachcode': 'language_code'}, inplace = True)

## Country_Code

### Add country name and region information

In [319]:
# Add country information
country= './data/country_code.csv'
df_country = pd.read_csv(country)
# Make column names lowercase
df_country.columns = df_country.columns.str.lower()

In [320]:
# Only keep relevant columns, rename
df_country = df_country[['name', 'alpha-2', 'region', 'sub-region']]
df_country.rename(columns = {'alpha-2':'country_code', 'name':'country_name', 'sub-region':'sub_region'}, inplace = True)

In [321]:
# Merge to accounts_new table
df_accounts = pd.merge(df_accounts, df_country, on='country_code', how='left')

## Language

In [322]:
# Fill null values
df_accounts['language'].fillna('na', inplace=True)

# PLAYBACKS

In [323]:
# Read playbacks
df_playbacks = pd.read_csv(playbacks)
# Set column names to lowercase
df_playbacks.columns = df_playbacks.columns.str.lower()


## Changing datatypes and adding column playback_ID

In [324]:
# Renaming columns
df_playbacks.rename(columns = {'date_start':'datetime_start'}, inplace = True)

In [325]:
# Changing datatype to datetime
df_playbacks['datetime_start'] = pd.to_datetime(df_playbacks['datetime_start'])

In [326]:
# Adding date only column
df_playbacks['date_start'] = pd.to_datetime(df_playbacks['datetime_start']).dt.date

In [327]:
# Changing datatype to datetime
df_playbacks['date_start'] = pd.to_datetime(df_playbacks['date_start']).dt.normalize()

In [None]:
df_playbacks.head()

In [329]:
# Adding playback_ID column and adding incremental nr as playback_id to every row starting from the total count of rows, descending
df_playbacks.insert(0, 'playback_id', range(len(df_playbacks), 0, -1))

## Add category according to user_agent

In [None]:
# Adding the infos about device of playback and app users
df_playbacks.loc[df_playbacks['user_agent'].str.contains('Windows|Macintosh|TV|Linux|Darwin|CrOS|PlayStation|FreeBSD'), 'device'] = 'desktop'
df_playbacks.loc[df_playbacks['user_agent'].str.contains('Android|iOS|iPhone|iPad'), 'device'] = 'mobile'
df_playbacks.loc[df_playbacks['user_agent'].str.contains('filmingo'), 'app_user'] = 'yes'
df_playbacks.loc[~df_playbacks['user_agent'].str.contains('filmingo'), 'app_user'] = 'no'
df_playbacks.groupby('device').count()

# SUBSCRIPTIONS

In [331]:
# Read subscriptions
df_subscriptions = pd.read_csv(subscriptions)
# Set column names to lowercase
df_subscriptions.columns = df_subscriptions.columns.str.lower()

In [332]:
# Set data types for subscription dates to datetime
df_subscriptions['subscription_start'] = pd.to_datetime(df_subscriptions['subscription_start'])
df_subscriptions['subscription_end'] = pd.to_datetime(df_subscriptions['subscription_end'])

In [333]:
# Drop "wrong" subscription type line (FULLACCESS - unknown type to us - only one line therefore decided to drop)
df_subscriptions.drop(df_subscriptions[(df_subscriptions['subscription_type'] == 'FULLACCESS')].index, inplace = True)

In [334]:
# Create new column and calculate subscription duration
df_subscriptions['subscription_months_raw'] = ((df_subscriptions.subscription_end) - df_subscriptions.subscription_start)/np.timedelta64(1, 'M')

In [335]:
# Create new column for rounded subscription months for easier further processing
# Generally round up from 0.1 to be able to allow some discrepancies due to day to day calculation of subscription duration (deduct 0.1 to be able to use .ceil)
df_subscriptions['subscription_months'] = df_subscriptions['subscription_months_raw'] - 0.1
df_subscriptions['subscription_months'] = df_subscriptions['subscription_months'].apply(np.ceil)

In [336]:
# Create two columns for chf and eur based on the subscription_type and prices from the filmingo website

# Create a list of the conditions
conditions = [
    ((df_subscriptions['subscription_type'] == 'BASIC') & (df_subscriptions['subscription_monthly'] == 0)),
    ((df_subscriptions['subscription_type'] == 'BASIC') & (df_subscriptions['subscription_monthly'] == 1)),
    ((df_subscriptions['subscription_type'] == 'STANDARD') & (df_subscriptions['subscription_monthly'] == 0)),
    ((df_subscriptions['subscription_type'] == 'STANDARD') & (df_subscriptions['subscription_monthly'] == 1)),
    ((df_subscriptions['subscription_type'] == 'PATRON') & (df_subscriptions['subscription_monthly'] == 0))

]

# Create a list of the values we want to assign for each condition
values_chf = ['90.0', '9.0', '150.0', '15.0', '240.0']
values_eur = ['75.0', '7.5', '125.0', '12.5', '200.0']

# Create a new column and use np.select to assign values to it using our lists as arguments
df_subscriptions['price_chf'] = np.select(conditions, values_chf)
df_subscriptions['price_eur'] = np.select(conditions, values_eur)

# Change datatype into float for further calculation
df_subscriptions['price_chf'] = df_subscriptions.price_chf.astype('float')
df_subscriptions['price_eur'] = df_subscriptions.price_eur.astype('float')

# Decided to use these prices for all subscriptions regardless if they might have a different prices in the list or are gifted subscription


In [337]:
# Calculate total price per subscription (price / 12 * subscription months)
df_subscriptions['total_price_chf'] = df_subscriptions['price_chf'] / 12 * df_subscriptions['subscription_months']
df_subscriptions['total_price_eur'] = df_subscriptions['price_eur'] / 12 * df_subscriptions['subscription_months']


# Conditional calculation for exceptions:

# If the subscription is monthly only calculate price * months
df_subscriptions.loc[(df_subscriptions['subscription_monthly'] == 1), 'total_price_chf'] = (df_subscriptions['price_chf'] * df_subscriptions['subscription_months'])
df_subscriptions.loc[(df_subscriptions['subscription_monthly'] == 1), 'total_price_eur'] = (df_subscriptions['price_eur'] * df_subscriptions['subscription_months'])

# If the subscription is gifted & 6 months long, a different price is applicable (there is only a 6 month subscription available for gifted subscriptions)
df_subscriptions.loc[((df_subscriptions['gift_subscription'] == True) & (df_subscriptions['subscription_months'] == 6)), 'total_price_chf'] = '49'
df_subscriptions.loc[((df_subscriptions['gift_subscription'] == True) & (df_subscriptions['subscription_months'] == 6)), 'total_price_eur'] = '41'

# VOUCHERS

In [338]:
# Read promo_couchers
df_vouchers = pd.read_csv(vouchers, sep=';')
# Set column names to lowercase
df_vouchers.columns = df_vouchers.columns.str.lower()

In [339]:
# Changing datatype to datetime
df_vouchers['creationdate'] = pd.to_datetime(df_vouchers['creationdate'])
df_vouchers['expirationdate'] = pd.to_datetime(df_vouchers['expirationdate'])

In [340]:
# Cut off time of creation -> time not relevant? 
# df_vouchers['creationdate'] = df_vouchers['creationdate'].dt.date -> does not work very well as it returns an object
df_vouchers['creationdate'] = df_vouchers['creationdate'].dt.normalize()

In [341]:
# Adding voucher_id column and adding incremental nr as voucher_id to every row starting from 1, ascending
df_vouchers.insert(0, 'voucher_id', range(1, 1 + len(df_vouchers)))

In [342]:
# Rename columns
df_vouchers.rename(columns = {'account_key':'account_key_sender', 'email_hash':'email_hash_receiver', 'voucherused': 'voucher_used', 'creationdate': 'creation_date', 'expirationdate': 'expiration_date'}, inplace = True)

In [343]:
# Truncate vouchers table to our time range
# start_date = '2020-10-01'
# end_date = '2022-10-01'
# mask = (df_vouchers['creation_date'] >= start_date) & (df_vouchers['creation_date'] <= end_date)
# df_vouchers = df_vouchers.loc[mask]

In [344]:
# Add voucher information to accounts table - only take into account unique email_hash_receiver (drop duplicates)
df_accounts = df_accounts.merge(df_vouchers.drop_duplicates('email_hash_receiver')[['email_hash_receiver', 'voucher_used']], how='left', left_on='email_hash', right_on='email_hash_receiver')

In [345]:
# Drop email_hash_receiver as it is not needed
df_accounts.drop('email_hash_receiver', axis=1, inplace = True)

# DATE TABLE

In [None]:
df_date = pd.DataFrame({"Date": pd.date_range('2020-01-01','2022-12-31')})

# PUSH DATA TO SQL SERVER

In [None]:
schema = 'capstone_filmingo' # UPDATE 'TABLE_SCHEMA' based on schema used in class 
engine = get_engine() # assign engine to be able to query against the database

In [None]:
df_accounts.head()

## Accounts

In [None]:
# # Specify which table within your database you want to push your data to. Give your table an unambiguous name.
# table_name = 'accounts'
# # If the specified table doesn't exist yet, it will be created
# # With 'replace', your data will be replaced if the table already exists.
# # This may take some time ...

# # Write records stored in a dataframe to SQL database
# if engine!=None:
#     try:
#         df_date.to_sql(name=table_name, # Name of SQL table
#                         con=engine, # Engine or connection
#                         if_exists='replace', # Drop the table before inserting new values 
#                         schema=schema, # Use schmea that was defined earlier
#                         index=False, # Write DataFrame index as a column
#                         chunksize=5000, # Specify the number of rows in each batch to be written at a time
#                         method='multi') # Pass multiple values in a single INSERT clause
#         print(f"The {table_name} table was imported successfully.")
#     # Error handling
#     except (Exception, psycopg2.DatabaseError) as error:
#         print(error)
#         engine = None
# else:
#      print('Push did not work')

In [None]:
# Just to be sure: Check if the number of rows match
table_name_sql = f'''SELECT count(*) 
                    FROM {schema}.{table_name}
                    '''
engine.execute(table_name_sql).fetchall()[0][0] == df_accounts.shape[0]

## Subscriptions

In [None]:
# # Specify which table within your database you want to push your data to. Give your table an unambiguous name.
# table_name = 'subscriptions'
# # If the specified table doesn't exist yet, it will be created
# # With 'replace', your data will be replaced if the table already exists.
# # This may take some time ...

# # Write records stored in a dataframe to SQL database
# if engine!=None:
#     try:
#         df_date.to_sql(name=table_name, # Name of SQL table
#                         con=engine, # Engine or connection
#                         if_exists='replace', # Drop the table before inserting new values 
#                         schema=schema, # Use schmea that was defined earlier
#                         index=False, # Write DataFrame index as a column
#                         chunksize=5000, # Specify the number of rows in each batch to be written at a time
#                         method='multi') # Pass multiple values in a single INSERT clause
#         print(f"The {table_name} table was imported successfully.")
#     # Error handling
#     except (Exception, psycopg2.DatabaseError) as error:
#         print(error)
#         engine = None
# else:
#      print('Push did not work')

In [None]:
# Just to be sure: Check if the number of rows match
table_name_sql = f'''SELECT count(*) 
                    FROM {schema}.{table_name}
                    '''
engine.execute(table_name_sql).fetchall()[0][0] == df_subscriptions.shape[0]

## Playback

In [None]:
# # Specify which table within your database you want to push your data to. Give your table an unambiguous name.
# table_name = 'playbacks'
# # If the specified table doesn't exist yet, it will be created
# # With 'replace', your data will be replaced if the table already exists.
# # This may take some time ...

# # Write records stored in a dataframe to SQL database
# if engine!=None:
#     try:
#         df_date.to_sql(name=table_name, # Name of SQL table
#                         con=engine, # Engine or connection
#                         if_exists='replace', # Drop the table before inserting new values 
#                         schema=schema, # Use schmea that was defined earlier
#                         index=False, # Write DataFrame index as a column
#                         chunksize=5000, # Specify the number of rows in each batch to be written at a time
#                         method='multi') # Pass multiple values in a single INSERT clause
#         print(f"The {table_name} table was imported successfully.")
#     # Error handling
#     except (Exception, psycopg2.DatabaseError) as error:
#         print(error)
#         engine = None
# else:
#      print('Push did not work')


In [None]:
# Just to be sure: Check if the number of rows match
table_name_sql = f'''SELECT count(*) 
                    FROM {schema}.{table_name}
                    '''
engine.execute(table_name_sql).fetchall()[0][0] == df_playbacks.shape[0]

## Promo Voucher

In [None]:
# # Specify which table within your database you want to push your data to. Give your table an unambiguous name.
# table_name = 'vouchers'
# # If the specified table doesn't exist yet, it will be created
# # With 'replace', your data will be replaced if the table already exists.
# # This may take some time ...

# # Write records stored in a dataframe to SQL database
# if engine!=None:
#     try:
#         df_date.to_sql(name=table_name, # Name of SQL table
#                         con=engine, # Engine or connection
#                         if_exists='replace', # Drop the table before inserting new values 
#                         schema=schema, # Use schmea that was defined earlier
#                         index=False, # Write DataFrame index as a column
#                         chunksize=5000, # Specify the number of rows in each batch to be written at a time
#                         method='multi') # Pass multiple values in a single INSERT clause
#         print(f"The {table_name} table was imported successfully.")
#     # Error handling
#     except (Exception, psycopg2.DatabaseError) as error:
#         print(error)
#         engine = None
# else:
#      print('Push did not work')

In [None]:
# Just to be sure: Check if the number of rows match
table_name_sql = f'''SELECT count(*) 
                    FROM {schema}.{table_name}
                    '''
engine.execute(table_name_sql).fetchall()[0][0] == df_vouchers.shape[0]

## Date Table

In [None]:
# # Specify which table within your database you want to push your data to. Give your table an unambiguous name.
# table_name = 'date'
# # If the specified table doesn't exist yet, it will be created
# # With 'replace', your data will be replaced if the table already exists.
# # This may take some time ...

# # Write records stored in a dataframe to SQL database
# if engine!=None:
#     try:
#         df_date.to_sql(name=table_name, # Name of SQL table
#                         con=engine, # Engine or connection
#                         if_exists='replace', # Drop the table before inserting new values 
#                         schema=schema, # Use schmea that was defined earlier
#                         index=False, # Write DataFrame index as a column
#                         chunksize=5000, # Specify the number of rows in each batch to be written at a time
#                         method='multi') # Pass multiple values in a single INSERT clause
#         print(f"The {table_name} table was imported successfully.")
#     # Error handling
#     except (Exception, psycopg2.DatabaseError) as error:
#         print(error)
#         engine = None
# else:
#      print('Push did not work')

In [None]:
# Just to be sure: Check if the number of rows match
table_name_sql = f'''SELECT count(*) 
                    FROM {schema}.{table_name}
                    '''
engine.execute(table_name_sql).fetchall()[0][0] == df_date.shape[0]