In [1]:
import tools.conf
import psycopg
from psycopg import sql
import pandas as pd
import numpy as np

pd.set_option('display.max_columns', None)

In [13]:
def add_1_field(data:list, table_name:str, field_name: str)-> None:
    """
    Skeleton function for adding data to single column tables.

    :param data: List of strings or integers representing table contents
    :param table_name: String reprexsenting name of the table into which data is going to be added
    :raise psycopg.DataError: If data type does not match table restrictions
    :return: None
    """
    
    query = sql.SQL('INSERT INTO {table} ({field}) VALUES (%s)')

    for elem in data:
        cur.execute(
            query.format(
                table=sql.Identifier(f'{table_name}'),
                field=sql.Identifier(f'{field_name}')), (elem,)
        )
    conn.commit()

def iter_over_inputs(data_set:list[dict[list,str,str]])-> None:
    """
    Main loop for iteration over one column tables.

    :param data_set: List containing dicts with data, table name and field/column name.
    List contains strings or integers representing the data to be added into selected tables.
    :param table_name: String representing name of the table into which data is going to be added
    :raise KeyError: If key name does not match the pattern
    :return: None
    """
    
    for elem in data_set:
        data = elem['data']
        table = elem['table']
        field = elem['field']
        new_data, data = check_for_data_1_field(data, table, field)
        if new_data:
            add_1_field(data, table, field)

def check_for_data_1_field(data_:list, table_name:str, field_name:str)-> tuple[bool,list[str]]:
    """
    Skeleton function for checking if there is data inside each of one column tables.
    Ads data if there are any new entries, skips if no new data was found. 
    If DB is empty returns immediately.

    :param data_: List containing data to be checked and added. Data is of str or int types.
    :param table_name: String representing name of the table into which data is going to be added
    :param field_name: String representing name of the field/ column name
    :raise psycopg.DataError: If data type does not match table restrictions
    :return: A tuple containing bool for logic purposes, anbd the data set to be added
    :rtype: tuple[bool, list[str/int]]
    """
    
    query = sql.SQL('SELECT {field} FROM {table}')
    cur.execute(
        query.format(
            table=sql.Identifier(table_name),
            field=sql.Identifier(field_name))
    )
    
    in_db = pd.DataFrame([elem[0] for elem in cur.fetchall()])
    in_db.rename(columns={0: field_name}, inplace=True)
    
    if len(in_db) == 0:
        return (True, data_)
    else:
        if field_name == 'date':
            in_db['date'] = pd.to_datetime(in_db['date'])
            in_db = list(in_db['date'])
        else: 
            in_db = list(in_db[field_name])
        
        if len(in_db) != 0 and table_name in avoid_adding:
            print(f'>>> Not adding to {table_name}. No new data found.')
            return (False, list(''))
        in_df = pd.DataFrame(data_)
        in_df = in_df.rename(columns={0: field_name})
        if field_name == 'date':
            in_df['date'] = pd.to_datetime(in_df['date'])
        
        # we check if df contains new data in comparison to DB
        new_data = in_df[~in_df.isin(in_db)].dropna()
        new_data = new_data[field_name]
        new_data = list(new_data)
        
        if len(new_data) != 0:
            print(f'>>> Adding to {table_name}. New data found.')
            return (True, new_data)
        else:
            return (False, list(''))

def add_3_fields(data_set:dict[pd.DataFrame,str,list])-> None:
    """
    Function adding data into mediums table, which consists of 3 columns.

    :param data_set: A dict contaning data to be added, table name, and field / column name.
    Data is a Pandas DataFrame, table name and field name are both strings.
    :raise KeyError: If key name does not match the pattern
    :raise psycopg.DataError: If table or field names doesn't match those in the DB
    :return: None
    """
    
    col = data_set['data'].columns.values.tolist()
    
    query = sql.SQL('INSERT INTO {table} ({field1}, {field2}, {field3}) VALUES (%s, %s ,%s)')

    for _, elem in data_set['data'].iterrows():
        cur.execute(
            query.format(
                table=sql.Identifier(data_set['table']),
                field1=sql.Identifier(data_set['fields'][0]),
                field2=sql.Identifier(data_set['fields'][1]),
                field3=sql.Identifier(data_set['fields'][2])), 
                (elem[col[0]], elem[col[1]], elem[col[2]],)
        )
        
    conn.commit()
    
def check_for_data_3_fields(fields:list[str], table_name: str, submediums: pd.DataFrame)-> tuple[bool,pd.DataFrame]:
    """
    Returns a bool for logic purposes and data to be added into mediums table.
    If DB is empty returns original DF. During data update process returns the data not present in the DB
    or indicates there is nothing to be added.

    :param fields: A list containing field / column names represented as a str
    :param table_name: Name of the table into which data is going to be added as a str
    :param submediums: Pandas DataFrame containing data to add.
    :raise psycopg.DataError: If data type does not match table restrictions
    :return: Tuple containing bool for logic purposes and a Pandas DataFrame 
    as data to be added into the DB during the update
    :rtype: tuple[bool, pd.DataFrame]
    """
    
    query = sql.SQL('SELECT id, {field1}, {field2}, {field3} FROM {table}')
    cur.execute(
        query.format(
            table=sql.Identifier(table_name),
            field1=sql.Identifier(fields[0]),
            field2=sql.Identifier(fields[1]),
            field3=sql.Identifier(fields[2]))
    )
    
    in_db = pd.DataFrame(cur.fetchall())
    in_db.rename(columns={0: 'id',1: fields[0], 2: fields[1], 3: fields[2]}, inplace=True)
    
    if len(in_db) == 0:
        return (True, submediums)
    else:
        in_db = list(in_db[fields[0]])
        in_df = submediums.copy()
        # we check if df contains new data in comparison to DB
        new_data = in_df[~in_df.isin(in_db)].dropna()
        
        if len(new_data) != 0 :
            print(f'>>> Adding to {table_name}. New data found.')
            return (True, new_data)
        print(f'>>> Not adding to {table_name}. No new data found.')
        return (False, submediums)

def add_8_fields(data_set:dict[pd.DataFrame,str,list[str]])-> None:
    """
    Function adding data into ad_time_details table, which consists of 8 columns.

    :param data_set: A dict contaning data to be added, table name, and field / column name.
    Data is a Pandas DataFrame, table name and field name are both strings.
    :raise KeyError: If key name does not match the pattern
    :raise psycopg.DataError: If table or field names doesn't match those in the DB
    :return: None
    """
    
    col = data_set['data'].columns.values.tolist()
    
    query = sql.SQL(
        '''
        INSERT INTO {table} ({field1}, {field2}, {field3}, {field4}, {field5}, {field6}, {field7}, {field8}) 
        VALUES (%s, %s ,%s, %s ,%s, %s ,%s ,%s)
        ''')

    for _, elem in data_set['data'].iterrows():
        cur.execute(
            query.format(
                table=sql.Identifier(data_set['table']),
                field1=sql.Identifier(data_set['fields'][0]),
                field2=sql.Identifier(data_set['fields'][1]),
                field3=sql.Identifier(data_set['fields'][2]),
                field4=sql.Identifier(data_set['fields'][3]),
                field5=sql.Identifier(data_set['fields'][4]),
                field6=sql.Identifier(data_set['fields'][5]),
                field7=sql.Identifier(data_set['fields'][6]),
                field8=sql.Identifier(data_set['fields'][7])), 
                (elem[col[0]], elem[col[1]], elem[col[2]], 
                 elem[col[3]], elem[col[4]], elem[col[5]], 
                 elem[col[6]], elem[col[7]],
                )
        )
        
    conn.commit()

def add_10_fields(data_set:dict[pd.DataFrame,str,list[str]])-> None:
    """
    Function adding data into ad_time_details table, which consists of 10 columns.

    :param data_set: A dict contaning data to be added, table name, and field / column name.
    Data is a Pandas DataFrame, table name and field name are both strings.
    :raise KeyError: If key name does not match the pattern
    :raise psycopg.DataError: If table or field names doesn't match those in the DB
    :return: None
    """
    
    col = data_set['data'].columns.values.tolist()
    
    query = sql.SQL(
        '''
        INSERT INTO {table} ({field1}, {field2}, {field3}, {field4}, {field5}, {field6}, {field7}, {field8}, {field9}, {field10}) 
        VALUES (%s, %s ,%s, %s ,%s, %s ,%s ,%s ,%s ,%s)
        ''')

    for _, elem in data_set['data'].iterrows():
        cur.execute(
            query.format(
                table=sql.Identifier(data_set['table']),
                field1=sql.Identifier(data_set['fields'][0]),
                field2=sql.Identifier(data_set['fields'][1]),
                field3=sql.Identifier(data_set['fields'][2]),
                field4=sql.Identifier(data_set['fields'][3]),
                field5=sql.Identifier(data_set['fields'][4]),
                field6=sql.Identifier(data_set['fields'][5]),
                field7=sql.Identifier(data_set['fields'][6]),
                field8=sql.Identifier(data_set['fields'][7]),
                field9=sql.Identifier(data_set['fields'][8]),
                field10=sql.Identifier(data_set['fields'][9])), 
                (elem[col[0]], elem[col[1]], elem[col[2]], 
                 elem[col[3]], elem[col[4]], elem[col[5]], 
                 elem[col[6]], elem[col[7]], elem[col[8]], 
                 elem[col[9]],
                )
        )
        
    conn.commit()

def get_id_for_submediums(fields:list[str], table_:str)-> tuple[bool, pd.DataFrame]:
    """
    Gets IDs from reference tables to mediums table. 
    Mainly connects submediums with broadcaster and reach tables.
    Returns a bool for logic purposes and data to be added into mediums.

    :param fields: A list containing field / column names represented as a str
    :param table_: Name of the table out of which the data is going to be pulled, 
    represented as a str
    :raise psycopg.DataError: If data type does not match table restrictions
    :return: Tuple containing bool for logic purposes and a Pandas DataFrame 
    as data to be added into the DB during the update or initial DB fill.
    :rtype: tuple[bool, pd.DataFrame]
    """
    
    submediums = df[['submedium', 'wydawca_nadawca', 'zasięg medium']].sort_values(by='submedium')
    submediums.drop_duplicates(subset=['submedium'], keep='first', inplace=True, ignore_index=True)
    
    if sum(submediums.value_counts()) != submediums.index.max() + 1:
        exit('Max index different than the length of the list.')
    
    query1 = sql.SQL('SELECT {fields} FROM {table}').format(
    fields=sql.SQL(',').join([
        sql.Identifier('broadcaster'),
        sql.Identifier('id')
    ]),
    table=sql.Identifier('broadcasters'))
    cur.execute(query1)
    broadcasters = dict(cur.fetchall())

    query2 = sql.SQL('SELECT {fields} FROM {table}').format(
        fields=sql.SQL(',').join([
            sql.Identifier('reach'),
            sql.Identifier('id')
        ]),
        table=sql.Identifier('ad_reach'))
    cur.execute(query2)
    ad_reach = dict(cur.fetchall())

    submediums['wydawca_nadawca'] = submediums['wydawca_nadawca'].map(broadcasters)
    submediums['zasięg medium'] = submediums['zasięg medium'].map(ad_reach)
    
    trigger, submediums = check_for_data_3_fields(fields, table_, submediums)
    
    return (trigger, submediums)

def get_id_for_ad_time(fields: list[str], table_: str)-> tuple[bool,pd.DataFrame]:
    """
    Gets IDs from reference tables to ad_time_details table. 
    Mainly connects time details of singular ad emission with other tables containing details via IDs.
    This function populates one of two core tables in this DB.
    Returns a bool for logic purposes and data to be added into mediums.

    :param fields: A list containing field / column names represented as a str
    :param table_: Name of the table out of which the data is going to be pulled, 
    represented as a str
    :raise psycopg.DataError: If data type does not match table restrictions
    :return: Tuple containing bool for logic purposes and a Pandas DataFrame 
    as data to be added into the DB during the update or initial DB fill.
    :rtype: tuple[bool, pd.DataFrame]
    """
    
    ad_time = df[['data', 'godzina_bloku_reklamowego', 'gg', 'mm', 'dl_mod', 'daypart', 'dł_ujednolicona', 'ad_time_details']]
    ad_time.index = ad_time.index + get_index_val(table_)

    query1 = sql.SQL('SELECT {fields} FROM {table}').format(
        fields=sql.SQL(',').join([
            sql.Identifier('length'),
            sql.Identifier('id')
            ]),
        table=sql.Identifier('unified_lengths'))

    cur.execute(query1)
    unified_lengths = dict(cur.fetchall())

    query2 = sql.SQL('SELECT {fields} FROM {table}').format(
        fields=sql.SQL(',').join([
            sql.Identifier('daypart'),
            sql.Identifier('id')
            ]),
        table=sql.Identifier('dayparts'))

    cur.execute(query2)
    dayparts = dict(cur.fetchall())

    ad_time.loc[:, 'daypart'] = ad_time['daypart'].map(dayparts)
    ad_time.loc[:, 'dł_ujednolicona'] = ad_time['dł_ujednolicona'].map(unified_lengths)
    
    trigger, ad_time = get_min_max_date(fields, table_, ad_time)
    
    return (trigger, ad_time)

def get_id_for_ads_desc(fields: list[str], table_: str)-> tuple[bool,pd.DataFrame]:
    """
    Gets IDs from reference tables to ads_desc table. 
    Mainly connects other tables and data of singular ad emission via IDs with other tables.
    This function populates one of two core tables in this DB.
    Returns a bool for logic purposes and data to be added into mediums.

    :param fields: A list containing field / column names represented as a str
    :param table_: Name of the table out of which the data is going to be pulled, 
    represented as a str
    :raise psycopg.DataError: If data type does not match table restrictions
    :return: Tuple containing bool for logic purposes and a Pandas DataFrame 
    as data to be added into the DB during the update or initial DB fill.
    :rtype: tuple[bool, pd.DataFrame]
    """
    
    ads_desc = df[['data', 'opis_reklamy', 'kod_reklamy', 'brand', 'submedium', 'ad_time_details', 'produkt(4)', 'koszt', 'l_emisji', 'typ_reklamy']]
    ads_desc.index = ads_desc.index + get_index_val(table_)

    query1 = sql.SQL('SELECT {fields} FROM {table}').format(
        fields=sql.SQL(',').join([
            sql.Identifier('brand'),
            sql.Identifier('id')
            ]),
        table=sql.Identifier('brands'))

    cur.execute(query1)
    brands_id = dict(cur.fetchall())


    query2 = sql.SQL('SELECT {fields} FROM {table}').format(
        fields=sql.SQL(',').join([
            sql.Identifier('submedium'),
            sql.Identifier('id')
            ]),
        table=sql.Identifier('mediums'))

    cur.execute(query2)
    medium_id = dict(cur.fetchall())


    query3 = sql.SQL('SELECT {fields} FROM {table}').format(
        fields=sql.SQL(',').join([
            sql.Identifier('ad_code'),
            sql.Identifier('id')
            ]),
        table=sql.Identifier('ad_time_details'))

    cur.execute(query3)
    ad_time_details_id = dict(cur.fetchall())


    query4 = sql.SQL('SELECT {fields} FROM {table}').format(
        fields=sql.SQL(',').join([
            sql.Identifier('product_type'),
            sql.Identifier('id')
            ]),
        table=sql.Identifier('product_types'))

    cur.execute(query4)
    product_type_id = dict(cur.fetchall())

    ads_desc.loc[:, 'brand'] = ads_desc['brand'].map(brands_id)
    ads_desc.loc[:, 'submedium'] = ads_desc['submedium'].map(medium_id)
    ads_desc.loc[:, 'ad_time_details'] = ads_desc['ad_time_details'].map(ad_time_details_id)
    ads_desc.loc[:, 'produkt(4)'] = ads_desc['produkt(4)'].map(product_type_id)
    
    trigger, ads_desc = get_min_max_date(fields, table_, ads_desc)
    
    return (trigger, ads_desc)

def get_colum_names(table_name:str)->list[str]:
    """
    A function which returns the names of selected table from the DB.

    :raise psycopg.DatabaseError: If column names does not match DB contents
    :return: List containing all the column names present in selected table. 
    :rtype: list[str]
    """
    
    query = sql.SQL(
    '''
    SELECT c.column_name 
    FROM information_schema.columns c 
    WHERE c.table_name = %s
    ORDER BY c.ordinal_position;
    ''').format()
    cur.execute(query, (table_name,))
    table_data = cur.fetchall()
    temp = []
    for elem in table_data[1:]:
        temp.append(elem[0])
    table_data = temp
    
    return table_data

def get_index_val(table_name: str)-> int:
    """
    Function gets max index value from the selected table and returns it as an integer increased by one.
    When the table is empty, returns 1

    :param table_name: Name of the table out of which the data is going to be pulled, 
    represented as a str
    :raise psycopg.DataError: If data type does not match table restrictions
    :return: number representiung max index value of selected table icreased by 1
    :rtype: int
    """
    
    query = sql.SQL('SELECT MAX(id) FROM {table};')
    cur.execute(query.format(table=sql.Identifier(table_name)))
    ind = cur.fetchone()
    if ind[0] == None:
        return 1
    else:
        return ind[0] + 1

def get_min_max_date(fields: list[str], table_: str, dataframe: pd.DataFrame)-> tuple[bool, pd.DataFrame]:
    """
    Gets max and min dates from selected table. Then checks if dates present in the DF passed as a parameter
    are outside of dates range. If so, allows data insertion into the DB, if not it informs the user, 
    and proceedes with the rest of the code.

    :param fields: A list containing field / column names represented as a str
    :param table_: Name of the table out of which the data is going to be pulled, 
    represented as a str
    :param dataframe: Pandas DataFrame with the new data to be checked if not present in selected table
    :raise psycopg.DatabaseError: If column names does not match DB contents
    :return: Tuple containing bool for logic purposes and a Pandas DataFrame 
    as data to be added into the DB during the update or initial DB fill.
    :rtype: tuple[bool, pd.DataFrame]
    """
    
    # Get max date from DB
    query = sql.SQL('SELECT MAX({field1}) FROM {table};')
    cur.execute(
        query.format(
            table=sql.Identifier(table_),
            field1=sql.Identifier(fields[0])
        )
    )
    in_db_max = pd.Timestamp(cur.fetchone()[0])
    
    # Get min date from DB
    query = sql.SQL('SELECT MIN({field1}) FROM {table};')
    cur.execute(
        query.format(
            table=sql.Identifier(table_),
            field1=sql.Identifier(fields[0])
        )
    )
    in_db_min = pd.Timestamp(cur.fetchone()[0])
    
    # Get max and min date from DF
    in_df_max = dataframe['data'].max()
    in_df_min = dataframe['data'].min()
    
    # Check if min and max dates from DF are between range of dates from DB
    min_df_in_db_range = in_db_min <= in_df_min <= in_db_max
    max_df_in_db_range = in_db_min <= in_df_max <= in_db_max
    
    # Main logic add if empty or when dates not present in DB.
    if  pd.isnull(in_db_max) or pd.isnull(in_db_min) :
        return (True, dataframe)
    elif not min_df_in_db_range and not max_df_in_db_range:
        print(f'>>> Adding to {table_}. New data found.')
        return (True, dataframe)
    else:
        print(f'>>> Not adding to {table_}. One or more dates already in DB.')
        print(f'>>> Check the data you want to insert into DB.')
        return (False, dataframe)


In [14]:
# Openes connection to the DB
print('Oppening connection.')
conn = psycopg.connect(
    f'''dbname={tools.conf.DB}
        user={tools.conf.USER}
        host={tools.conf.HOST}
        port={tools.conf.PORT}
    '''
)

cur = conn.cursor()

  df['ad_time_details'] = df[['data', 'kod_reklamy']].apply(lambda x: f'{x[0]} - {x[1]} - {ind[x.name]}', axis=1)


In [None]:
print('Creating DataFrame.')
# Reads the dataframe
df = pd.read_csv('./data/baza2.csv', delimiter=';', thousands=',', dtype={'dł_ujednolicona': 'object'}, encoding='utf-8', parse_dates=['data'])
df.sort_values(by='data', axis=0, inplace=True)
df.reset_index(inplace=True)
df.drop('index', axis=1, inplace=True)
ind = df.index.values + get_index_val('ads_desc')
df['ad_time_details'] = df[['data', 'kod_reklamy']].apply(lambda x: f'{x["data"]} - {x["kod_reklamy"]} - {ind[x.name]}', axis=1)

In [15]:
# Create datasets for simple tables
dow2 = ['Poniedziałek', 'Wtorek', 'Środa', 'Czwartek', 'Piątek',
        'Sobota', 'Niedziela']
months = [
    'Styczeń', 'Luty', 'Marzec', 'Kwiecień', 'Maj',
    'Czerwiec', 'Lipiec', 'Sierpień', 'Wrzesień',
    'Październik', 'Listopad', 'Grudzień'
]
dates = df['data'].unique()
brands = df['brand'].sort_values().unique()
lengths = df['dł_ujednolicona'].sort_values().unique()
dayparts = df['daypart'].unique()
product_types = df['produkt(4)'].sort_values().unique()
broadcasters = df['wydawca_nadawca'].sort_values().unique()
reaches = df['zasięg medium'].unique()

data_set = [{'data': dow2, 'table': 'pl_dow_names', 'field': 'dow_name'},
            {'data': months, 'table': 'pl_month_names', 'field': 'month_name'},
            {'data': dates, 'table': 'date_time', 'field': 'date'},
            {'data': brands, 'table': 'brands', 'field': 'brand'},
            {'data': lengths, 'table': 'unified_lengths', 'field': 'length'},
            {'data': dayparts, 'table': 'dayparts', 'field': 'daypart'},
            {'data': product_types, 'table': 'product_types', 'field': 'product_type'},
            {'data': broadcasters, 'table': 'broadcasters', 'field': 'broadcaster'},
            {'data': reaches, 'table': 'ad_reach', 'field': 'reach'},
            ]

avoid_adding = ['pl_dow_names', 'pl_month_names', 'dayparts', 'ad_reach']

In [16]:
# Inserting data into simple tables
print('Inserting data to one input tables.')
try:
    iter_over_inputs(data_set)
except psycopg.OperationalError as e:
    conn.close()
    print('Failed to input the data.')
    print(f'Error: {e}')

# Create and insert data into mediums table
print('Inserting data to the three input table.')
fields = get_colum_names('mediums')
trigger, submediums = get_id_for_submediums(fields, 'mediums')
data_set2 = {'data': submediums, 'table': 'mediums', 'fields': fields}
if trigger:
    try:
        add_3_fields(data_set2)
    except psycopg.OperationalError as e:
        conn.close()
        print('Failed to input the data.')
        print(f'Error: {e}')

# Create and insert data into ad_time_details table
print('Inserting data to the eight input table.')
fields = get_colum_names('ad_time_details')
trigger, ad_time = get_id_for_ad_time(fields, 'ad_time_details')
data_set3 = {'data': ad_time, 'table': 'ad_time_details', 'fields': fields}
if trigger:
    try:
        add_8_fields(data_set3)
    except psycopg.OperationalError as e:
        conn.close()
        print('Failed to input the data.')
        print(f'Error: {e}')

# Create and insert data into ad_time_details table
print('Inserting data to the ten input table.')
fields = get_colum_names('ads_desc')
trigger, ads_desc = get_id_for_ads_desc(fields, 'ads_desc')
data_set4 = {'data': ads_desc, 'table': 'ads_desc', 'fields': fields}
if trigger:
    try:
        add_10_fields(data_set4)
    except psycopg.OperationalError as e:
        conn.close()
        print('Failed to input the data.')
        print(f'Error: {e}')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  ads_desc['brand'] = ads_desc['brand'].map(brands_id)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  ads_desc['submedium'] = ads_desc['submedium'].map(medium_id)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  ads_desc['ad_time_details'] = ads_desc['ad_time_details'].map(ad_time_details_id)
A value i

In [1]:
import sqlite3
import pandas as pd
import numpy as np


pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [2]:
def get_index_val(table_name: str, cur: sqlite3.Cursor)-> int:
    """
    Function gets max index value from the selected table and returns it as an integer increased by one.
    When the table is empty, returns 1

    :param table_name: Name of the table out of which the data is going to be pulled, 
    represented as a str
    :param cur: Is a cursor object created for con object
    :raise sqlite3.ProgrammingError: If there are any error raised by the DB-API
    :daise sqlite3.OperationalError: If any eceptions on the DB side are raised, i.g. DB being locked
    :return: number representiung max index value of selected table icreased by 1
    :rtype: int
    """
    
    query = (f'SELECT MAX(id) FROM {table_name};')
    cur.execute(query)
    ind = cur.fetchone()
    if ind[0] == None:
        return 1
    else:
        return ind[0] + 1
    
def iter_over_inputs(data_set:list[dict[list,str,str]], con: sqlite3.Connection, 
                        cur: sqlite3.Cursor, avoid_adding: list[str])-> None:
    """
    Main loop for iteration over one column tables.

    :param data_set: List containing dicts with data, table name and field/column name.
    List contains strings or integers representing the data to be added into selected tables.
    :param table_name: String representing name of the table into which data is going to be added
    :param con: Is a connection object, pointing to a DB
    :param cur: Is a cursor object created for con object
    :param avoid_adding: List of tablet which doesn't need to be updated
    :raise KeyError: If key name does not match the pattern
    :raise sqlite3.ProgrammingError: If there are any error raised by the DB-API
    :daise sqlite3.OperationalError: If any eceptions on the DB side are raised, i.g. DB being locked
    :return: None
    """
    
    for elem in data_set:
        data = elem['data']
        table = elem['table']
        field = elem['field']
        new_data, data = check_for_data_1_field(data, table, field, cur, avoid_adding)
        if new_data:
            add_1_field(data, table, field, con, cur)

def add_1_field(data:list, table_name:str, field_name: str, 
                con: sqlite3.Connection, cur: sqlite3.Cursor)-> None:
    """
    Skeleton function for adding data to single column tables.

    :param data: List of strings or integers representing table contents
    :param table_name: String reprexsenting name of the table into which data is going to be added
    :param con: Is a connection object, pointing to a DB
    :param cur: Is a cursor object created for con object
    :raise sqlite3.ProgrammingError: If there are any error raised by the DB-API
    :daise sqlite3.OperationalError: If any eceptions on the DB side are raised, i.g. DB being locked
    :return: None
    """
    
    query = (f'INSERT INTO {table_name} ({field_name}) VALUES(:name);')
    for elem in data:
        to_add = {'name': str(elem)}
        cur.execute(query, to_add)
    con.commit()

def check_for_data_1_field(data_:list, table_name:str, field_name:str, 
                            cur: sqlite3.Cursor, avoid_adding: list[str])-> tuple[bool,list[str]]:
    """
    Skeleton function for checking if there is data inside each of one column tables.
    Ads data if there are any new entries, skips if no new data was found. 
    If DB is empty returns immediately.

    :param data_: List containing data to be checked and added. Data is of str or int types.
    :param table_name: String representing name of the table into which data is going to be added
    :param field_name: String representing name of the field/ column name
    :param cur: Is a cursor object created for con object
    :param avoid_adding: List of tablet which doesn't need to be updated
    :raise sqlite3.ProgrammingError: If there is an error raised by the DB-API
    :daise sqlite3.OperationalError: If any eceptions on the DB side are raised, i.g. DB being locked
    :return: A tuple containing bool for logic purposes, anbd the data set to be added
    :rtype: tuple[bool, list[str/int]]
    """
    
    query = (f'SELECT {field_name} FROM {table_name}')
    cur.execute(query)
    
    in_db = pd.DataFrame([elem[0] for elem in cur.fetchall()])
    in_db.rename(columns={0: field_name}, inplace=True)
    
    if len(in_db) == 0:
        return (True, data_)
    else:
        if field_name == 'data':
            # in_db['data'] = pd.to_datetime(in_db['data'])
            in_db = list(in_db['data'])
        else: 
            in_db = list(in_db[field_name])
        
        if len(in_db) != 0 and table_name in avoid_adding:
            print(f'>>> Not adding to {table_name}. No new data found.')
            return (False, list(''))
        in_df = pd.DataFrame(data_)
        in_df = in_df.rename(columns={0: field_name})
        # if field_name == 'data':
        #     pass
            # in_df['data'] = pd.to_datetime(in_df['data'])
        
        # we check if df contains new data in comparison to DB
        new_data = in_df[~in_df.isin(in_db)].dropna()
        new_data = new_data[field_name]
        new_data = list(new_data)
        
        if len(new_data) != 0:
            print(f'>>> Adding to {table_name}. New data found.')
            return (True, new_data)
        else:
            print(f'>>> Not adding to {table_name}. No new data found.')
            return (False, list(''))
        
def add_3_fields(data_set:dict[pd.DataFrame,str,list], 
                 con: sqlite3.Connection, cur: sqlite3.Cursor)-> None:
    """
    Function adding data into mediums table, which consists of 3 columns.

    :param data_set: A dict contaning data to be added, table name, and field / column name.
    Data is a Pandas DataFrame, table name and field name are both strings.
    :raise KeyError: If key name does not match the pattern
    :raise sqlite3.ProgrammingError: If table or field names doesn't match those in the DB
    :daise sqlite3.OperationalError: If any eceptions on the DB side are raised, i.g. DB being locked
    :return: None
    """
    
    # col = data_set['data'].columns.values.tolist()
    
    query = (f"""
             INSERT INTO {data_set['table']} 
             ({data_set['fields'][0]}, 
             {data_set['fields'][1]}, 
             {data_set['fields'][2]})
             VALUES
             (:{data_set['fields'][0]}, 
             :{data_set['fields'][1]}, 
             :{data_set['fields'][2]})
             """)

    for _, elem in data_set['data'].iterrows():
        # TODO do type casting before data insertion
        
        data = {f'{field}': value for field, value in zip(data_set['fields'], elem)}
        cur.execute(query, data)
        
    con.commit()

def get_column_names(table_name:str, 
                    cur: sqlite3.Cursor)-> list[str]:
    """
    A function which returns the names of selected table from the DB.

    :param table_name: Name of a table out of which colum names are extracted from
    :param con: A database connection object
    :param cur: A cursor database object
    :raise sqlite3.ProgrammingError: If column names does not match DB contents
    :daise sqlite3.OperationalError: If any eceptions on the DB side are raised, i.g. DB being locked
    :return: List containing all the column names present in selected table. 
    :rtype: list[str]
    """
    
    query = ("SELECT name FROM pragma_table_info(:table_name);")
    data = {'table_name': table_name}
    cur.execute(query, data)
    table_data = cur.fetchall()
    temp = []
    for elem in table_data[1:]:
        temp.append(elem[0])
    table_data = temp
    
    return table_data

def get_id_for_submediums(fields_:list[str], table_:str, 
                          cur: sqlite3.Cursor)-> tuple[bool, pd.DataFrame]:
    """
    Gets IDs from reference tables to mediums table. 
    Mainly connects submediums with broadcaster and reach tables.
    Returns a bool for logic purposes and data to be added into mediums.

    :param fields_: A list containing field / column names represented as a str
    :param table_: Name of the table out of which the data is going to be pulled, 
    represented as a str
    :param con: A database connection object
    :param cur: A cursor database object
    :raise sqlite3.ProgrammingError: If column names don't fit into the table design
    :daise sqlite3.OperationalError: If any eceptions on the DB side are raised, i.g. DB being locked
    :return: Tuple containing bool for logic purposes and a Pandas DataFrame 
    as data to be added into the DB during the update or initial DB fill.
    :rtype: tuple[bool, pd.DataFrame]
    """
    
    submedia = df[['Submedium', 'Wydawca/Nadawca', 'Zasięg medium']].sort_values(by='Submedium')
    submedia.drop_duplicates(subset=['Submedium'], keep='first', inplace=True, ignore_index=True)
    
    if sum(submedia.value_counts()) != submedia.index.max() + 1:
        exit('Max index different than the length of the list.')
    
    query1 = (f"SELECT nadawca, id FROM nadawcy;")
    cur.execute(query1)
    nadawcy = dict(cur.fetchall())

    query2 = ("SELECT zasieg, id FROM zasiegi;")
    cur.execute(query2)
    zasiegi = dict(cur.fetchall())

    submedia['Wydawca/Nadawca'] = submedia['Wydawca/Nadawca'].map(nadawcy)
    submedia['Zasięg medium'] = submedia['Zasięg medium'].map(zasiegi)
    
    trigger, submedia = check_for_data_3_fields(fields_, table_, submedia, cur)
    
    return (trigger, submedia)

def check_for_data_3_fields(fields:list[str], table_name: str, submedia: pd.DataFrame, 
                            cur: sqlite3.Cursor)-> tuple[bool,pd.DataFrame]:
    """
    Returns a bool for logic purposes and data to be added into mediums table.
    If DB is empty returns original DF. During data update process returns the data not present in the DB
    or indicates there is nothing to be added.

    :param fields: A list containing field / column names represented as a str
    :param table_name: Name of the table into which data is going to be added as a str
    :param submedia: Pandas DataFrame containing data to add.
    :param cur: A cursor database object
    :raise sqlite3.ProgrammingError: If column names don't fit into the table design
    :daise sqlite3.OperationalError: If any eceptions on the DB side are raised, i.g. DB being locked
    :return: Tuple containing bool for logic purposes and a Pandas DataFrame 
    as data to be added into the DB during the update
    :rtype: tuple[bool, pd.DataFrame]
    """
    
    query = (f"SELECT id, {fields[0]}, {fields[1]}, {fields[2]} FROM {table_name};")
    cur.execute(query)
    
    in_db = pd.DataFrame(cur.fetchall())
    in_db.rename(columns={0: 'id',1: fields[0], 2: fields[1], 3: fields[2]}, inplace=True)
    
    if len(in_db) == 0:
        return (True, submedia)
    else:
        in_db = list(in_db[fields[0]])
        in_df = submedia.copy()
        # we check if df contains new data in comparison to DB
        new_data = in_df[~in_df.isin(in_db)].dropna()
        
        if len(new_data) != 0 :
            print(f'>>> Adding to {table_name}. New data found.')
            return (True, new_data)
        print(f'>>> Not adding to {table_name}. No new data found.')
        return (False, submedia)

def get_id_for_ad_time(fields: list[str], table_: str, 
                       cur: sqlite3.Cursor)-> tuple[bool,pd.DataFrame]:
    """
    Gets IDs from reference tables to ad_time_details table. 
    Mainly connects time details of singular ad emission with other tables containing details via IDs.
    This function populates one of two core tables in this DB.
    Returns a bool for logic purposes and data to be added into mediums.

    :param fields: A list containing field / column names represented as a str
    :param table_: Name of the table out of which the data is going to be pulled, 
    represented as a str
    :param cur: A cursor database object
    :raise sqlite3.ProgrammingError: If column names don't fit into the table design
    :daise sqlite3.OperationalError: If any eceptions on the DB side are raised, i.g. DB being locked
    :return: Tuple containing bool for logic purposes and a Pandas DataFrame 
    as data to be added into the DB during the update or initial DB fill.
    :rtype: tuple[bool, pd.DataFrame]
    """
    
    czasy_reklam = df[['Data', 'Godzina bloku reklamowego', 'GG', 'MM', 'dł./mod.', 'Daypart', 'dł. Ujednolicona', 'Detale_kod_reklamy']]
    czasy_reklam.index = czasy_reklam.index + get_index_val(table_, cur)

    query1 = ("SELECT dl_ujednolicona, id FROM dl_ujednolicone;")
    cur.execute(query1)
    unified_lengths = dict(cur.fetchall())

    query2 = ("SELECT daypart, id FROM dayparty;")
    cur.execute(query2)
    dayparts = dict(cur.fetchall())

    czasy_reklam.loc[:, 'Daypart'] = czasy_reklam['Daypart'].map(dayparts)
    czasy_reklam.loc[:, 'dł. Ujednolicona'] = czasy_reklam['dł. Ujednolicona'].map(unified_lengths)

    trigger, czasy_reklam = get_min_max_date(fields, table_, czasy_reklam, cur)
    
    return (trigger, czasy_reklam)

def get_min_max_date(fields: list[str], table_: str, dataframe: pd.DataFrame, 
                     cur: sqlite3.Cursor)-> tuple[bool, pd.DataFrame]:
    """
    Gets max and min dates from selected table. Then checks if dates present in the DF passed as a parameter
    are outside of dates range. If so, allows data insertion into the DB, if not it informs the user, 
    and proceedes with the rest of the code.

    :param fields: A list containing field / column names represented as a str
    :param table_: Name of the table out of which the data is going to be pulled, 
    represented as a str
    :param dataframe: Pandas DataFrame with the new data to be checked if not present in selected table
    :param cur: A cursor database object
    :raise sqlite3.ProgrammingError: If column names don't fit into the table design
    :daise sqlite3.OperationalError: If any eceptions on the DB side are raised, i.g. DB being locked
    :return: Tuple containing bool for logic purposes and a Pandas DataFrame 
    as data to be added into the DB during the update or initial DB fill.
    :rtype: tuple[bool, pd.DataFrame]
    """
    
    # Get max date from DB
    query = (f"SELECT MAX({fields[0]}) FROM {table_};")
    cur.execute(query)
    in_db_max = pd.Timestamp(cur.fetchone()[0])
    
    # Get min date from DB
    query = (f"SELECT MIN({fields[0]}) FROM {table_};")
    cur.execute(query)
    in_db_min = pd.Timestamp(cur.fetchone()[0])
    
    # Get max and min date from DF
    in_df_max = dataframe['Data'].max()
    in_df_min = dataframe['Data'].min()
    
    # Check if min and max dates from DF are between range of dates from DB
    min_df_in_db_range = in_db_min <= in_df_min <= in_db_max
    max_df_in_db_range = in_db_min <= in_df_max <= in_db_max
    
    # Main logic add if empty or when dates not present in DB.
    if  pd.isnull(in_db_max) or pd.isnull(in_db_min) :
        return (True, dataframe)
    elif not min_df_in_db_range and not max_df_in_db_range:
        print(f'>>> Adding to {table_}. New data found.')
        return (True, dataframe)
    else:
        print(f'>>> Not adding to {table_}. One or more dates already in DB.')
        print(f'>>> Check the data you want to insert into DB.')
        return (False, dataframe)


def add_8_fields(data_set:dict[pd.DataFrame,str,list[str]], 
                 con: sqlite3.Connection, cur: sqlite3.Cursor)-> None:
    """
    Function adding data into ad_time_details table, which consists of 8 columns.

    :param data_set: A dict contaning data to be added, table name, and field / column name.
    Data is a Pandas DataFrame, table name and field name are both strings.
    :raise KeyError: If key name does not match the pattern
    :param con: A database connection object
    :param cur: A cursor database object
    :raise sqlite3.ProgrammingError: If column names don't fit into the table design
    :daise sqlite3.OperationalError: If any eceptions on the DB side are raised, i.g. DB being locked
    :return: None
    """
    
    query = (f"""
             INSERT INTO {data_set['table']} (
                {data_set['fields'][0]},
                {data_set['fields'][1]},
                {data_set['fields'][2]},
                {data_set['fields'][3]},
                {data_set['fields'][4]},
                {data_set['fields'][5]},
                {data_set['fields'][6]},
                {data_set['fields'][7]})
            VALUES(
                :{data_set['fields'][0]},
                :{data_set['fields'][1]},
                :{data_set['fields'][2]},
                :{data_set['fields'][3]},
                :{data_set['fields'][4]},
                :{data_set['fields'][5]},
                :{data_set['fields'][6]},
                :{data_set['fields'][7]}
            )
             """)

    for _, elem in data_set['data'].iterrows():
        elem.Data = elem.Data.strftime('%Y-%m-%d')
        data = {f'{field}': value for field, value in zip(data_set['fields'], elem)}
        cur.execute(query, data)
        # con.commit()
    
    con.commit()

In [3]:
con = sqlite3.Connection('./radio_ads.db')
cur = con.cursor()

In [4]:
print('Creating DataFrame.')
# Reads the dataframe
df = pd.read_csv('../data/live_3.csv', delimiter=';', thousands=',',
                 dtype={'Dzień': 'category', 'Dzień tygodnia': 'category', 
                        'Nr. tyg.': 'category', 'Rok': 'category',
                        'Miesiąc': 'category', 'Zasięg medium': 'category',
                        'Brand': 'category', 'Produkt(4)': 'category',
                        'Kod Reklamy': 'int32', 'Opis Reklamy': 'object',
                        'Typ reklamy': 'category', 'Wydawca/Nadawca': 'category',
                        'Submedium': 'category', 'dł./mod.': 'int8',
                        'GG': 'int8', 'MM': 'int8', 'Koszt [zł]': 'Int32',
                        'L.emisji': 'int8', 'dł. Ujednolicona': 'Int8', 
                        'Godzina bloku reklamowego': 'category'}, 
                 encoding='utf-8', parse_dates=['Data'], low_memory=False
                 )
df.sort_values(by='Data', axis=0, inplace=True)
df.reset_index(inplace=True)
df.drop('index', axis=1, inplace=True)
ind = df.index.values + get_index_val('spoty', cur)
df2 = df[['Data', 'Kod Reklamy']].copy()
df2['Data_str'] = df2['Data'].dt.strftime('%Y-%m-%d')
df['Detale_kod_reklamy'] = df2[['Data_str', 'Kod Reklamy']].apply(lambda x: f'{x["Data_str"]} - {x["Kod Reklamy"]} - {ind[x.name]}', axis=1)
del df2

Creating DataFrame.


In [7]:
# Create datasets for simple tables
dow2 = ['Poniedziałek', 'Wtorek', 'Środa', 'Czwartek', 'Piątek',
        'Sobota', 'Niedziela']
months = [
    'Styczeń', 'Luty', 'Marzec', 'Kwiecień', 'Maj',
    'Czerwiec', 'Lipiec', 'Sierpień', 'Wrzesień',
    'Październik', 'Listopad', 'Grudzień'
]
dates = df['Data'].dt.strftime('%Y-%m-%d').unique()
brands = df['Brand'].sort_values().unique()
lengths = ['10', '15', '20', '30', '45', '60',]
dayparts = df['Daypart'].unique()
product_types = df['Produkt(4)'].sort_values().unique()
broadcasters = df['Wydawca/Nadawca'].sort_values().unique()
reaches = df['Zasięg medium'].unique()

# TODO pomyśleć, czy potrzeba tu odwołania do db i pobrania czegokolwiek. Chyba nie.
data_set = [{'data': dow2, 'table': 'dni_tyg', 'field': 'dzien_tyg'},
            {'data': months, 'table': 'miesiace', 'field': 'miesiac'},
            {'data': dates, 'table': 'data_czas', 'field': 'data'},
            {'data': brands, 'table': 'brandy', 'field': 'brand'},
            {'data': lengths, 'table': 'dl_ujednolicone', 'field': 'dl_ujednolicona'},
            {'data': dayparts, 'table': 'dayparty', 'field': 'daypart'},
            {'data': product_types, 'table': 'typy_produktu', 'field': 'typ_produktu'},
            {'data': broadcasters, 'table': 'nadawcy', 'field': 'nadawca'},
            {'data': reaches, 'table': 'zasiegi', 'field': 'zasieg'},
            ]

avoid_adding = ['dni_tyg', 'miesiace', 'dl_ujednolicone', 'dayparty', 'zasiegi']

In [8]:
# Inserting data into simple tables
print('Inserting data to one input tables.')
try:
    iter_over_inputs(data_set, con, cur, avoid_adding)
except sqlite3.ProgrammingError as e:
    con.close()
    print('Failed to input the data.')
    print(f'Error: {e}')
except sqlite3.OperationalError as e:
    con.close()
    print('Failed to input the data.')
    print(f'Error: {e}')
    exit()

# Create and insert data into mediums table
print('Inserting data to the three input table.')
fields = get_column_names('submedia', cur)
trigger, submedia = get_id_for_submediums(fields, 'submedia', cur)
data_set2 = {'data': submedia, 'table': 'submedia', 'fields': fields}
if trigger:
    try:
        add_3_fields(data_set2, con, cur)
    except sqlite3.ProgrammingError as e:
        con.close()
        print('Failed to input the data.')
        print(f'Error: {e}')
    except sqlite3.OperationalError as e:
        con.close()
        print('Failed to input the data.')
        print(f'Error: {e}')
        exit()

# Create and insert data into ad_time_details table
print('Inserting data to the eight input table.')
fields = get_column_names('czasy_reklam', cur)
trigger, czasy_reklam = get_id_for_ad_time(fields, 'czasy_reklam', cur)
data_set3 = {'data': czasy_reklam, 'table': 'czasy_reklam', 'fields': fields}
if trigger:
    try:
        add_8_fields(data_set3, con, cur)
    except sqlite3.ProgrammingError as e:
        con.close()
        print('Failed to input the data.')
        print(f'Error: {e}')
        exit()
    except sqlite3.OperationalError as e:
        con.close()
        print('Failed to input the data.')
        print(f'Error: {e}')
        exit()

Inserting data to one input tables.
>>> Not adding to dni_tyg. No new data found.
>>> Not adding to miesiace. No new data found.
>>> Not adding to data_czas. No new data found.
>>> Not adding to brandy. No new data found.
>>> Not adding to dl_ujednolicone. No new data found.
>>> Not adding to dayparty. No new data found.
>>> Not adding to typy_produktu. No new data found.
>>> Not adding to nadawcy. No new data found.
>>> Not adding to zasiegi. No new data found.
