In [481]:
import os
import pandas as pd
import numpy as np
from configparser import ConfigParser
import sqlalchemy as db
from sqlalchemy.orm import sessionmaker, declarative_base, relationship
from sqlalchemy import MetaData, Table, Column, Integer, BigInteger, Date, Numeric, ForeignKey, CHAR, Boolean, Text, select, and_

In [482]:
# getting the database (postgresql) credentials from the .ini file
def get_db_ini(ini_file):
    db_parser = ConfigParser()
    db_parser.read(str(ini_file))

    # get section, default to postgresql
    ini = {}
    params = db_parser.items('postgresql')
    for param in params:
        ini[param[0]] = param[1]
    
    return ini

db_ini = get_db_ini(ini_file="database.ini")

In [483]:
# create and open the connection to the database
engine = db.create_engine(
    url="postgresql://{0}:{1}@{2}:{3}/{4}".format(
        db_ini['user'], db_ini['password'], db_ini['host'], db_ini['port'], db_ini['database']
    )
)
metadata = MetaData(bind=engine)
metadata.reflect()

In [484]:
Base = declarative_base()

class App(Base):
    __tablename__ = 'apps'
    app_apple_id = Column(Integer, primary_key=True, nullable=False)
    app_name = Column(Text)
    
    subscriptions = relationship('Subscription', back_populates='app')

class Subscription(Base):
    __tablename__ = 'subscriptions'
    subscription_apple_id = Column(Integer, primary_key=True)
    app_apple_id = Column(Integer, ForeignKey('apps.app_apple_id', ondelete='CASCADE'))
    subscription_group_id = Column(Integer)
    subscription_name = Column(Text)
    subscription_duration = Column(Text)
    introductory_price_type = Column(Text)
    introductory_price_duration = Column(Text)
    marketing_opt_in_duration = Column(Text)

    # Relationship with the App class
    app = relationship('App', back_populates='subscriptions')

    # Relationship to use in the Transaction class to refer back to the Subscription
    transactions = relationship('Transaction', back_populates='subscription')

class Transaction(Base):
    __tablename__ = 'transactions'
    transaction_id = Column(Integer, primary_key=True, autoincrement=True)
    event_date = Column(Date)
    subscriber_id = Column(BigInteger)
    subscription_apple_id = Column(Integer, ForeignKey('subscriptions.subscription_apple_id', ondelete='CASCADE'))
    customer_price = Column(Numeric(16, 2))
    customer_currency = Column(CHAR(3))
    customer_price_usd = Column(Numeric(16, 2))
    developer_proceeds = Column(Numeric(16, 2))
    proceeds_currency = Column(CHAR(3))
    developer_proceeds_usd = Column(Numeric(16, 2))
    preserved_pricing = Column(Boolean)
    proceeds_reason = Column(Text)
    client = Column(Text)
    device = Column(Text)
    country = Column(CHAR(2))
    subscriber_id_reset = Column(Boolean)
    refund = Column(Boolean)
    purchase_date = Column(Date)
    units = Column(Integer)

    # Relationship with the Subscription class
    subscription = relationship('Subscription', back_populates='transactions')

#Base.metadata.create_all(engine)

In [485]:
def convert_to_usd(amount, currency):
    if currency == 'USD' or amount == 0:
        return amount
    else:
        rate = RATES.get(currency, 1)  # Default to a rate of 1 if the currency rate isn't available
        return round(amount / rate, 2)

# import currency rates
# Currency exchange API can be used if better precision is needed
import json

with open('rates.json', 'r') as f:
    RATES = json.load(f)

#rates

In [486]:
def transform_df(df):
    
    df['Event Date'] = pd.to_datetime(df['Event Date'], format="%Y-%m-%d")
    df['Purchase Date'] = pd.to_datetime(df['Purchase Date'], errors='coerce').replace({pd.NaT: None}, inplace=True) #convert to datetime with NaN in the col

    df['customer_price_usd'] = df['Customer Price'].copy()
    df['customer_price_usd'] = df.apply(lambda row: convert_to_usd(row['Customer Price'], row['Customer Currency']), 
                                           axis=1)
    
    df['developer_proceeds_usd'] = df['Developer Proceeds'].copy()
    df['developer_proceeds_usd'] = df.apply(lambda row: convert_to_usd(row['Developer Proceeds'], row['Proceeds Currency']),
                                            axis=1)
    
    # convert all Yes-blank columns to boolean 
    df['Preserved Pricing'] = df['Preserved Pricing'] == 0
    df['Subscriber ID Reset'] = df['Subscriber ID Reset'] == 0
    df['Refund'] = df['Refund'] == 0
    
    # rename columns
    cols = ['event_date', 'app_name', 'app_apple_id', 'subscription_name',
            'subscription_apple_id', 'subscription_group_id',
            'subscription_duration', 'introductory_price_type',
            'introductory_price_duration', 'marketing_opt_in_duration',
            'customer_price', 'customer_currency', 'developer_proceeds',
            'proceeds_currency', 'preserved_pricing', 'proceeds_reason', 'client',
            'device', 'country', 'subscriber_id', 'subscriber_id_reset', 'refund',
            'purchase_date', 'units', 'customer_price_usd',
            'developer_proceeds_usd']
    df.columns = cols
    
    # separating dataframe for each table 
    apps_data = df[['app_apple_id', 'app_name']].copy().drop_duplicates()
    subscriptions_data = df[['subscription_apple_id', 'app_apple_id', 'subscription_group_id',
                             'subscription_name', 'subscription_duration', 'introductory_price_type',
                             'introductory_price_duration', 'marketing_opt_in_duration']].copy()
    transactions_data = df[['event_date', 'subscriber_id', 'subscription_apple_id',
                            'customer_price', 'customer_currency', 'customer_price_usd',
                            'developer_proceeds', 'proceeds_currency', 'developer_proceeds_usd',
                            'preserved_pricing', 'proceeds_reason', 'client', 'device', 'country',
                            'subscriber_id_reset', 'refund', 'purchase_date', 'units']].copy()

    def clear_subs(column_name):
        for index in subscriptions_data['subscription_apple_id'].unique():
            value = subscriptions_data[subscriptions_data['subscription_apple_id'] == index][column_name].drop_duplicates().sort_values().iloc[0]

            subscriptions_data.loc[
                subscriptions_data['subscription_apple_id'] == index,
                column_name
            ] = subscriptions_data.loc[
                subscriptions_data['subscription_apple_id'] == index,
                column_name
            ].fillna(value)

    for name in ['introductory_price_type', 'introductory_price_duration']:
        clear_subs(column_name=name)

    #subscriptions_data['marketing_opt_in_duration'].fillna(value='no_data', inplace=True)
    subscriptions_data = subscriptions_data.replace({np.nan: None})
    transactions_data[['proceeds_reason', 'client', 'device', 'country']] = transactions_data[['proceeds_reason', 'client', 'device', 'country']].replace({np.nan: None, '': None})
        
    subscriptions_data.drop_duplicates()
    
    return apps_data, subscriptions_data, transactions_data

In [487]:
def insert_dataframe(df, table, unique_columns):
    """
    Insert data from a DataFrame into a database table through a given SQLAlchemy session.
    Checks for the existence of each row before insertion.
    """

    for idx, row in df.iterrows():
        if len(unique_columns) == 0:
            conditions = [table.c[col] == row[col] for col in row.index if col in table.c]
            stmt = select([table]).where(and_(True, *conditions))
        else:
            # Check existence based on unique columns
            conditions = [table.c[col] == row[col] for col in unique_columns]
            stmt = select([table]).where(and_(True, *conditions))

        existing = session.execute(stmt).scalar_one_or_none()

        # If the record does not exist, insert a new record
        if existing is None:
            new_record = {col: row[col] for col in row.index if col in table.c}
            session.execute(table.insert().values(**new_record))
    
    session.commit()

In [488]:
Session = sessionmaker(bind=engine)
session = Session()

try:
    # load the data from files in the directory
    directory_path = "itunes_dataset"

    for filename in os.listdir(directory_path):
        if filename.endswith(".txt"):
            file_path = os.path.join(directory_path, filename)
            data = pd.read_csv(file_path, delimiter='\t')
            print(f'Loading from: {file_path}', end=' ')
            
            apps_data, subscriptions_data, transactions_data = transform_df(data)

            apps_table = metadata.tables['apps']
            insert_dataframe(apps_data, apps_table, unique_columns=['app_apple_id'])

            subscriptions_table = metadata.tables['subscriptions']
            insert_dataframe(subscriptions_data, subscriptions_table, unique_columns=['subscription_apple_id'])

            transactions_table = metadata.tables['transactions']
            insert_dataframe(transactions_data, transactions_table, unique_columns=[])

            print('Done.')

except Exception as e:
    print("Error occurred:", e)
    session.rollback()

finally:
    session.close()
    engine.dispose()

Loading from: itunes_dataset\20190201.txt Done.
Loading from: itunes_dataset\20190202.txt Done.
Loading from: itunes_dataset\20190203.txt Done.
Loading from: itunes_dataset\20190204.txt Done.
Loading from: itunes_dataset\20190205.txt Done.
Loading from: itunes_dataset\20190206.txt Done.
Loading from: itunes_dataset\20190207.txt Done.
Loading from: itunes_dataset\20190208.txt Done.
Loading from: itunes_dataset\20190209.txt Done.
Loading from: itunes_dataset\20190210.txt Done.
