# Source Validation - report

Idea of following report is to show user the most important information about the quality of data source. Each data source constist sales data taken from daughter companies. Script was created to check if data source can be migrated to production instance. Result of following report is saved in html file, which should be checked by the person who has created it and rated if everything with the source is ok.

Technical info:
- Report is based on Python and SQL queries
- Final result is displayed in section Final report, apart of raw data there are also plots which help to visualise/find anomalies and suspicious data
- Result of the report is html file, which is created in last cell -> it is based on jupyter notebook cell tags
- Every cell has its tag which suggest what should be done with it in html file:
    - remove_input - code input is not saved to html file
    - remove_output - output of cell is not saved to html file


In [None]:
# Imports
import sqlalchemy
import pandas as pd
import plotly.express as px
import random

## Source ID + database connection

In [None]:
# ID of the source
while True:
    try:
        SOURCE_ID = int(input("Insert SOURCE_ID which should be checked: ").strip())
    except ValueError:
        print("Please insert a number")
    else:
        break

# Login data 
database_username = input("Please insert database LOGIN: ").lower().strip()
database_password = input("Please insert database PASSWORD: ")

# Float format display - 3 decimal places
pd.set_option('display.float_format', lambda x: '%.3f' % x) 

In [None]:
# DB credentials
POSTGRES_ADDRESS = '111.11.11.11' ## DB ADDRESS IF IT'S NOT ON PANOPLY
POSTGRES_PORT = '5555'
POSTGRES_USERNAME = f'{database_username}' ## PANOPLY/POSTGRES USERNAME
POSTGRES_PASSWORD = f'{database_password}' ## PANOPLY/POSTGRES PASSWORD 
POSTGRES_DBNAME = 'postgres_database' ## DATABASE NAME

# A long string that contains the necessary Postgres login information
postgres_str = ('postgresql://{username}:{password}@{ipaddress}:{port}/{dbname}'.format(username=POSTGRES_USERNAME,
                                                                                        password=POSTGRES_PASSWORD,
                                                                                        ipaddress=POSTGRES_ADDRESS,
                                                                                        port=POSTGRES_PORT,
                                                                                        dbname=POSTGRES_DBNAME))

# Create the connection
cnx = sqlalchemy.create_engine(postgres_str)

## Data preparation

In [None]:
# Source name
source_details = f"""select name
                        from postgres_database.source_details
                        where id = {SOURCE_ID}
                        """

try:
    df_source_name = pd.read_sql(source_details, cnx)
except Exception as e:
        print(str(e))
        
SOURCE_NAME = df_source_name.full_name.to_string(index=False)

In [None]:
# First and last source loading

SOURCE_LOAD_DATETIME = f""" select sl.source_id
                            , max(sl.load_datetime)  as max_load_datetime_source_loading_info
                            , min(sl.load_datetime)  as min_load_datetime_source_loading_info
                            , max(sli.load_datetime) as max_load_datetime_source_loading_info_init
                            , min(sli.load_datetime) as min_load_datetime_source_loading_info_init
                        from postgres_database.source_loading_info sl
                                join postgres_database.source_loading_info_init sli
                                    on sl.id = sli.source_loading_info_id
                        where sl.source_id = {SOURCE_ID}
                        and sl.load_datetime >= '2022-01-01'::date
                        group by 1 """
                                
try:
    df_source_load_datetime = pd.read_sql_query(SOURCE_LOAD_DATETIME, cnx)
    
except Exception as e:
        print(str(e))

# Source loading dates - min i max
SOURCE_LOAD_DATE_MIN = df_source_load_datetime['min_load_datetime_source_loading_info'].to_string(index=False)[:10]
SOURCE_LOAD_DATE_MAX = df_source_load_datetime['max_load_datetime_source_loading_info'].to_string(index=False)[:10]

print(f"First load date: {SOURCE_LOAD_DATE_MIN}")
print(f"Last load date: {SOURCE_LOAD_DATE_MAX}")

# Dates which are going to be used in a loop for downloading the data from the database
SOURCE_LOAD_DATE_MIN_MINUS_1_DAY = (df_source_load_datetime['min_load_datetime_source_loading_info'] + pd.DateOffset(-1)).to_string(index=False)[:10]
SOURCE_LOAD_DATE_MAX_PLUS_1_DAY = (df_source_load_datetime['max_load_datetime_source_loading_info'] + pd.DateOffset(1)).to_string(index=False)[:10]

# Time periods for downloading the data
load_dates_intervals = pd.date_range(SOURCE_LOAD_DATE_MIN_MINUS_1_DAY, SOURCE_LOAD_DATE_MAX_PLUS_1_DAY, periods=6, inclusive='both')

# Date conversion for SQL query
load_dates_intervals_str = [str(date.to_period('D')) for date in load_dates_intervals]
load_dates_intervals_str_string = ', '.join(load_dates_intervals_str)
print(load_dates_intervals_str)

In [None]:
# Looking for suspicious source loads
try:
    querry = f"""select
                    load_datetime,
                    load_reason,
                    purpose,
                    coalesce(branch,'null') as branch, 
                    coalesce(workflow_name,'null') as "workflow_name",
                    coalesce(row_count,0) as row_count,
                    source_id,
                    datetime_added,
                    coalesce(manufacturer_column,'null') as manufacturer_column,
                    coalesce(prod_name_col,'null') as prod_name_col,
                    load_datetime as "weeks"
                from postgres_database.source_loading_info_init
                where source_id = {SOURCE_ID} and load_datetime >= '{SOURCE_LOAD_DATE_MIN}'::date
                order by load_datetime asc;"""
                
    df_row_count = pd.read_sql(querry, cnx)
    
except Exception as e:
        print(str(e))

df_row_count['load_datetime'] = pd.to_datetime(df_row_count['load_datetime'], utc=True)
manufacturer_column = df_row_count[["manufacturer_column", "load_datetime"]].groupby(["manufacturer_column"]).agg(['min', 'max', 'count'])
prod_name_col = df_row_count[["prod_name_col", "load_datetime"]].groupby(["prod_name_col"]).agg(['min', 'max', 'count'])
df_row_count.rename(columns = {'row_count': 'amount'}, inplace = True)
df_row_count2 = df_row_count[['weeks','amount','load_datetime','workflow_name']].groupby(['weeks','workflow_name']).agg({'amount':'sum','load_datetime':'first'}).sort_values(by='load_datetime').reset_index()

In [None]:
# Function to detect all suspicious values if exist + calculate statistics
def suspicious_data(dataframe):
    df = pd.DataFrame(dataframe[0:0])
    median = dataframe.amount.median()
    std = dataframe.amount.std()
    minim = dataframe.amount.min()
    maxim = dataframe.amount.max()
    
    suspected_row_counts = []
    for i in dataframe.amount:
        if i < median - 2 * std or i > median + 2 * std:
            suspected_row_counts.append(i)
        else:
            pass
    
    suspected_row_counts.sort()
    
    if len(suspected_row_counts) == 0:
        print(f"No suspicious data was found in source files from all {dataframe.count().max()} rows")
    else:
        print(f"Suspicious data was found in source files {len(suspected_row_counts)} times \nfrom all {max(dataframe.count())} rows \nThey take the following values: ")        
    
    print(f"""

    Maximum value:
    {maxim}

    Average value:
    {median}

    Minimum value:
    {minim}
    
    Standard deviation:
    {std}
    """)

    if len(suspected_row_counts) > 0:
        print(f"""Following values are suspicious: {suspected_row_counts}""")
    elif len(suspected_row_counts) == 0:
        print("""No suspicious values were found""")
    else:
        print("""Following values needs to be checked""")
    
    df = dataframe[(dataframe['amount'].isin(suspected_row_counts))] 
    pd.set_option("display.max_rows", None, "display.max_columns", None)  # Display whole DataFrame
    
    return display(df)


In [None]:
#Loads per date from postgres_database.source_loading_info
try:
    querry = f"""
                select 
                count(*) as amount,
                load_datetime::timestamp,
                name
                from postgres_database.source_loading_info
                where SOURCE_ID = {SOURCE_ID}
                  and is_deleted is FALSE
                  and to_reload is FALSE
                  and load_datetime between '{SOURCE_LOAD_DATE_MIN}'::date and '{SOURCE_LOAD_DATE_MAX}'::date
                group by 2, 3
                order by 2;"""
              
    df_source_load_datetime_per_source = pd.read_sql(querry, cnx)
    
except Exception as e:
        print(str(e))

In [None]:
#Files to reload
try:
    querry = f"""
                select source_id,
                       to_reload,
                       count(*)
                from postgres_database.source_loading_info
                where source_id = {SOURCE_ID}
                group by 1, 2
                """
              
    df_reload = pd.read_sql(querry, cnx)
    
except Exception as e:
        print(str(e))

In [None]:
#SPN - Source Product Name
try:
    querry = f"""
                select load_datetime,
                    case
                        when s_product_id is null then 's_product_id - empty'
                        when s_product_id = 0 then 's_product_id = 0'
                        when s_product_id > 0 then 's_product_id - ok'
                        else 'Check'
                        end as s_product_id,
                    count(*) amount
                from postgres_database.product_stock_info
                where SOURCE_ID = {SOURCE_ID} and load_datetime between '{SOURCE_LOAD_DATE_MIN}' and '{SOURCE_LOAD_DATE_MAX}'
                group by 1, 2
                order by 1
                """
    df_spn = pd.read_sql(querry, cnx)
    
except Exception as e:
        print(str(e))

In [None]:
# Old prices in price_product_info_latest
try:
    querry = f"""
            select qty_min,
                currency,
                SOURCE_ID,
                package_type_id,
                region_id,
                s_product_id,
                symbol,
                array_agg(product_id),
                count(*)
            from postgres_database.price_product_info_latest ppc
                    join postgres_database.product pr on pr.id = ppc.product_id
            where SOURCE_ID = {SOURCE_ID}

            group by 1, 2, 3, 4, 5, 6, 7
            having count(*) > 1;
            """
    df_old_prices = pd.read_sql(querry,cnx)

except Exception as e:
        print(str(e))

In [None]:
# price_product_info table validation

# Lists for loaded data storage
df_price_product_info_product_per_date_list = []
df_price_product_info_packages_list = []
df_price_product_info_max_min_list = []
df_price_product_info_currency_list = []
df_MRV_list = []

# Loop which goes through load data intervals
for i in range(len(load_dates_intervals_str) - 1): 
      
    price_product_info_product_per_date = f""" select load_datetime,
                                        count(product_id)
                                        from postgres_database.price_product_info
                                        where source_id = {SOURCE_ID}
                                        and load_datetime > '{load_dates_intervals_str[i]}'::date AND load_datetime <= '{load_dates_intervals_str[i+1]}'::date
                                        group by 1
                                        order by 1"""

    price_product_info_packages = f"""with price_product_info as(
                                                select distinct package_type_id
                                                from postgres_database.price_product_info
                                                where source_id = {SOURCE_ID}
                                                and load_datetime > '{load_dates_intervals_str[i]}'::date AND load_datetime <= '{load_dates_intervals_str[i+1]}'::date
                                                ),

                                    package_type as (
                                        select id,
                                                full_name
                                        from postgres_database.package_type
                                    ),

                                    package_root as(
                                        select ptr.id,
                                            root_id,
                                            full_name as full_name_root
                                        from postgres_database_measure.package_type_root as ptr
                                        left join package_type on package_type.id = ptr.root_id
                                    )

                                select distinct package_type_id,
                                    full_name,
                                    root_id,
                                    full_name_root
                                from price_product_info as pp
                                left join package_type as pt on pt.id=pp.package_type_id
                                left join package_root as pr on pr.id=pp.package_type_id; """

    price_product_info_max_min = f"""select max(price) as max_price
                                    , min(price)  as min_price
                                    , max(qty_min) as max_qty
                                    , min(qty_min) as min_qty
                                from postgres_database.price_product_info
                                where source_id = {SOURCE_ID}
                                and load_datetime > '{load_dates_intervals_str[i]}'::date AND load_datetime <= '{load_dates_intervals_str[i+1]}'::date;"""
                            
    price_product_info_currency = f"""select distinct region_id,
                                            name as region_name,
                                            currency
                                from postgres_database.price_product_info as pp
                                left join postgres_database.region on region.id = pp.region_id
                                where source_id = {SOURCE_ID}
                                and load_datetime > '{load_dates_intervals_str[i]}'::date AND load_datetime <= '{load_dates_intervals_str[i+1]}'::date;"""

    # Minimum Row Value -> check if minimum value of product (minimum amount * product price) is not suspicious
    MRV = f"""
            with currency_rate as (
                select
                    currency_from,
                    first(rate order by for_date desc) as latest_rate
                from postgres_database.currency_rate
                where currency_to = 'PLN'
                group by 1
                ),

                price_product_infos as(
                select pp.product_id,
                    price,
                    currency,
                    load_datetime,
                    qty_min,
                    package_type_id,
                    region_id,
                    case when s_product_id is null then 0
                        else s_product_id
                            end as s_product_id,
                    case when pp.currency = cr.currency_from then pp.price * cr.latest_rate
                        else pp.price
                            end as price_in_pln
                from postgres_database.price_product_info as pp
                left join currency_rate as cr on cr.currency_from = pp.currency
                where source_id = {SOURCE_ID}
                and load_datetime > '{load_dates_intervals_str[i]}'::date AND load_datetime <= '{load_dates_intervals_str[i+1]}'::date
                and price < 1
                ),

                mop_per_date as(
                        select product_id,
                            package_type_id,
                            region_id,
                            s_product_id,
                            max(load_datetime) as load_datetime,
                            min(qty_min) as qty_min
                        from price_product_infos
                        group by 1,2,3,4
                        ),

                MRV as(
                    select pp.product_id,
                        pp.package_type_id,
                        pp.region_id,
                        pp.s_product_id,
                        pp.load_datetime,
                        pp.qty_min,
                        price,
                        currency,
                        price_in_pln,
                        (price_in_pln * mpd.qty_min)::float as MRV_pln
                    from price_product_infos pp
                    join mop_per_date as mpd on pp.product_id = mpd.product_id
                            and mpd.load_datetime = pp.load_datetime
                            and mpd.qty_min = pp.qty_min
                            and mpd.package_type_id = pp.package_type_id
                            and mpd.region_id = pp.region_id
                            and mpd.s_product_id = pp.s_product_id
                )

            select product_id,
                price,
                currency,
                price_in_pln,
                qty_min,
                MRV_pln,
                load_datetime,
                package_type_id,
                region_id,
                s_product_id
            from MRV
            where MRV_pln < 0.1; """
    
 # Loading data from database and adding it to the list                       
    try:
        df_price_product_info_product_per_date_load = pd.read_sql_query(price_product_info_product_per_date, cnx)
        df_price_product_info_product_per_date_list.append(df_price_product_info_product_per_date_load)
        
        df_price_product_info_packages_load = pd.read_sql_query(price_product_info_packages, cnx)
        df_price_product_info_packages_list.append(df_price_product_info_packages_load)
               
        df_price_product_info_max_min_load = pd.read_sql_query(price_product_info_max_min, cnx)
        df_price_product_info_max_min_list.append(df_price_product_info_max_min_load) 
               
        df_price_product_info_currency_load = pd.read_sql_query(price_product_info_currency, cnx)   
        df_price_product_info_currency_list.append(df_price_product_info_currency_load)   
        
        df_MRV_load = pd.read_sql_query(MRV, cnx)
        df_MRV_list.append(df_MRV_load)
        
        print(f"Data loaded: {load_dates_intervals_str[i]} - {load_dates_intervals_str[i+1]}")
   
    except Exception as e:
            print(str(e))
    
# Concatenate data from list into one DataFrame
df_price_product_info_product_per_date = pd.concat(df_price_product_info_product_per_date_list, ignore_index=True)
df_price_product_info_packages = pd.concat(df_price_product_info_packages_list, ignore_index=True)
df_price_product_info_max_min = pd.concat(df_price_product_info_max_min_list, ignore_index=True)
df_price_product_info_currency = pd.concat(df_price_product_info_currency_list, ignore_index=True)
df_MRV = pd.concat(df_MRV_list, ignore_index=True)

# Duplicates removal
df_price_product_info_packages.drop_duplicates(inplace=True)
df_price_product_info_currency.drop_duplicates(inplace=True)

# Dataframes details - info
print("df_price_product_info_product_per_date:", df_price_product_info_product_per_date.shape)
print("df_price_product_info_packages:", df_price_product_info_packages.shape)
print("df_price_product_info_max_min:", df_price_product_info_max_min.shape)
print("df_price_product_info_currency:", df_price_product_info_currency.shape)
print("df_MRV:", df_MRV.shape)


In [None]:
# product_stock_info table validation

# Lists for loaded data storage
df_product_stock_info_product_per_date_list = []
df_product_stock_info_packages_list = []
df_product_stock_info_max_min_list = []
df_product_stock_info_region_list = []
df_product_stock_info_higher_than_zero_list = []

# Loop which goes through load data intervals
for i in range(len(load_dates_intervals_str) - 1): 

    product_stock_info_product_per_date = f""" select load_datetime,
                                                    count(product_id)
                                        from postgres_database.product_stock_info
                                        where source_id = {SOURCE_ID}
                                        and load_datetime > '{load_dates_intervals_str[i]}'::date AND load_datetime <= '{load_dates_intervals_str[i+1]}'::date
                                        group by 1
                                        order by 1;"""

    product_stock_info_packages = f"""with product_stock_info as(
                                            select distinct package_type_id
                                            from postgres_database.product_stock_info
                                            where source_id = {SOURCE_ID}
                                            and load_datetime > '{load_dates_intervals_str[i]}'::date AND load_datetime <= '{load_dates_intervals_str[i+1]}'::date
                                    ),

                                    package_type as (
                                        select id,
                                               full_name
                                        from postgres_database.package_type
                                    ),

                                    package_root as(
                                        select ptr.id,
                                            root_id,
                                            full_name as full_name_root
                                        from postgres_database_measure.package_type_root as ptr
                                        left join package_type on package_type.id = ptr.root_id
                                    )

                                select distinct package_type_id,
                                    full_name,
                                    root_id,
                                    full_name_root
                                from product_stock_info as pp
                                left join package_type as pt on pt.id=pp.package_type_id
                                left join package_root as pr on pr.id=pp.package_type_id;"""

    product_stock_info_max_min = f"""select max(stock)  as max_stock,
                                            min(stock)  as min_stock
                                    from postgres_database.product_stock_info
                                    where source_id = {SOURCE_ID}
                                    and load_datetime > '{load_dates_intervals_str[i]}'::date AND load_datetime <= '{load_dates_intervals_str[i+1]}'::date;"""
             
    product_stock_info_region = f""" select distinct
                                            region_id,
                                            name as region_name
                                from postgres_database.product_stock_info as ps
                                left join postgres_database.region on region.id = ps.region_id
                                where source_id = {SOURCE_ID}
                                and load_datetime > '{load_dates_intervals_str[i]}'::date AND load_datetime <= '{load_dates_intervals_str[i+1]}'::date;"""
                                
    product_stock_info_higher_than_zero = f"""select load_datetime,
                                                    case when stock = 0 then 'rowny zero'
                                                            when stock > 0 then 'wiekszy od zera'
                                                            else 'Podejrzany stock'
                                                            end as stock_type,
                                                    count(distinct product_id) as number_of_products
                                                from postgres_database.product_stock_info as ps
                                                left join postgres_database.region on region.id = ps.region_id
                                                where source_id = {SOURCE_ID}
                                                and load_datetime > '{load_dates_intervals_str[i]}'::date AND load_datetime <= '{load_dates_intervals_str[i+1]}'::date
                                                group by 1,2;"""

# Loading data from database and adding it to the list                        
    try:
        df_product_stock_info_product_per_date_load = pd.read_sql_query(product_stock_info_product_per_date, cnx)
        df_product_stock_info_product_per_date_list.append(df_product_stock_info_product_per_date_load)
        
        df_product_stock_info_packages_load = pd.read_sql_query(product_stock_info_packages, cnx)
        df_product_stock_info_packages_list.append(df_product_stock_info_packages_load)
        
        df_product_stock_info_max_min_load = pd.read_sql_query(product_stock_info_max_min, cnx)
        df_product_stock_info_max_min_list.append(df_product_stock_info_max_min_load)
        
        df_product_stock_info_region_load = pd.read_sql_query(product_stock_info_region, cnx)
        df_product_stock_info_region_list.append(df_product_stock_info_region_load)
        
        df_product_stock_info_higher_than_zero_load = pd.read_sql_query(product_stock_info_higher_than_zero, cnx)
        df_product_stock_info_higher_than_zero_list.append(df_product_stock_info_higher_than_zero_load)
        
        print(f"Data loaded: {load_dates_intervals_str[i]} - {load_dates_intervals_str[i+1]}")
        
    except Exception as e:
            print(str(e))
            
# Concatenate data from list into one DataFrame
df_product_stock_info_product_per_date = pd.concat(df_product_stock_info_product_per_date_list, ignore_index=True)
df_product_stock_info_packages = pd.concat(df_product_stock_info_packages_list, ignore_index=True)
df_product_stock_info_max_min = pd.concat(df_product_stock_info_max_min_list, ignore_index=True)
df_product_stock_info_region = pd.concat(df_product_stock_info_region_list, ignore_index=True)
df_product_stock_info_higher_than_zero = pd.concat(df_product_stock_info_higher_than_zero_list, ignore_index=True)

# Duplicates removal
df_product_stock_info_packages.drop_duplicates(inplace=True)
df_product_stock_info_region.drop_duplicates(inplace=True)

# Dataframes info
print("df_product_stock_info_product_per_date:", df_product_stock_info_product_per_date.shape)
print("df_product_stock_info_packages:", df_product_stock_info_packages.shape)
print("df_product_stock_info_max_min:", df_product_stock_info_max_min.shape)
print("df_product_stock_info_region:", df_product_stock_info_region.shape)
print("df_product_stock_info_higher_than_zero:", df_product_stock_info_higher_than_zero.shape)


In [None]:
# price_product_info_latest and product_stock_info_latest tabbles validation

price_product_info_latest = f""" select *
                                from postgres_database.price_product_info_latest
                                where source_id = {SOURCE_ID}
                                and load_datetime > '{SOURCE_LOAD_DATE_MIN}'::date"""

product_stock_info_latest = f""" select *
                                from postgres_database.product_stock_info_latest
                                where source_id = {SOURCE_ID}
                                and load_datetime > '{SOURCE_LOAD_DATE_MIN}'::date"""
                     
packages_price_product_info_latest = f"""with price_product_info as (
                                                    select distinct package_type_id
                                                    from postgres_database.price_product_info_latest
                                                    where source_id = {SOURCE_ID}
                                                    and load_datetime > '{SOURCE_LOAD_DATE_MIN}'::date
                                                ),

                                                package_type as (
                                                    select id,
                                                            full_name
                                                    from postgres_database.package_type
                                                ),

                                                package_root as(
                                                    select ptr.id,
                                                        root_id,
                                                        full_name as full_name_root
                                                    from postgres_database_measure.package_type_root as ptr
                                                    left join package_type on package_type.id = ptr.root_id
                                                )

                                            select distinct package_type_id,
                                                full_name,
                                                root_id,
                                                full_name_root
                                            from price_product_info as pp
                                            left join package_type as pt on pt.id=pp.package_type_id
                                            left join package_root as pr on pr.id=pp.package_type_id;"""

packages_product_stock_info_latest = f"""with product_stock_info as (
                                            select distinct package_type_id
                                            from postgres_database.product_stock_info_latest
                                            where source_id = {SOURCE_ID}
                                            and load_datetime > '{SOURCE_LOAD_DATE_MIN}'::date
                                            ),

                                            package_type as (
                                                select id,
                                                        full_name
                                                from postgres_database.package_type
                                            ),

                                            package_root as(
                                                select ptr.id,
                                                    root_id,
                                                    full_name as full_name_root
                                                from postgres_database_measure.package_type_root as ptr
                                                left join package_type on package_type.id = ptr.root_id
                                            )

                                        select distinct package_type_id,
                                            full_name,
                                            root_id,
                                            full_name_root
                                        from product_stock_info as pp
                                        left join package_type as pt on pt.id=pp.package_type_id
                                        left join package_root as pr on pr.id=pp.package_type_id;"""
                     
try:
    df_price_latest = pd.read_sql_query(price_product_info_latest, cnx)
    df_stock_latest = pd.read_sql_query(product_stock_info_latest, cnx)
    df_packages_price_product_info_latest = pd.read_sql_query(packages_price_product_info_latest, cnx)
    df_packages_product_stock_info_latest = pd.read_sql_query(packages_price_product_info_latest, cnx)

except Exception as e:
        print(str(e))

# DataFrames info        
print(f"price_latest: {df_price_latest.shape}")
print(f"stock_latest: {df_stock_latest.shape}")

In [None]:
# 10 random rows
def random_rows_check(dataframe):
    if dataframe.shape[0] == 0:
        return 'No products to check'    
    elif dataframe.shape[0] < 10:
        return dataframe
    else:
        list_of_10_random_products = random.sample(range(0, dataframe.shape[0]), 10)
        return dataframe.iloc[list_of_10_random_products]

In [None]:
# Code needed for correct saving plots in html, needs to be run before creation of the plots
import plotly.io
plotly.io.renderers.default = "notebook"

## Final report

In [None]:
# skonczylem tutaj
print(
f"""

Source details: 
{SOURCE_ID} - {SOURCE_NAME} 

""")

print(f"""
      
Following load dates of source were checked:
 
from:
{SOURCE_LOAD_DATE_MIN}

to:
{SOURCE_LOAD_DATE_MAX}

""")

# check if correct column was chosen for manufacturer data
print("""Data about manufacturers was taken from following column:\n""")
display(manufacturer_column)

# check if correct column was chosen for product name during the data load
print("""Data about products name was taken from following column:\n""")
display(prod_name_col)

print("""------------------------------------Price Product Info-------------------------------------""")

# Product packages check -> check if suspicious packages werent loaded to database
if df_price_product_info_packages.empty:
    print('\033[1mNo packages found for table price_product_info!\033[0m')
else:    
    with pd.option_context('display.max_rows', None,
                        'display.max_columns', None,
                        'display.precision', 3,
                        ):
        print("Following packages found for table price_product_info:")
        display(df_price_product_info_packages)

# Min/Max price check -> check if max price is not suspiciosly high and min price suspiciously low
if  df_price_product_info_max_min.empty:
    print(f"No min/max price was foun in table: price_product_info\n")   
else:
    with pd.option_context('display.max_rows', None,
                        'display.max_columns', None,
                        'display.precision', 3,
                        ):
        print(f"Following min/max prices were found for table: price_product_info, for following load dates:\n{load_dates_intervals_str_string}\n")
        display(df_price_product_info_max_min) 

# Currency/region check -> Check if correct currency was loaded for each region
if  df_price_product_info_currency.empty:
    print(f"No currency/region was found in table price_product_info\n")   
else:
    with pd.option_context('display.max_rows', None,
                        'display.max_columns', None,
                        'display.precision', 3,
                        ):
        print(f"I have found following currency/regions in table price_product_info:\n")
        display(df_price_product_info_currency) 

print("""------------------------------------Product Stock Info-------------------------------------""")

# Product packages check -> check if suspicious packages werent loaded to database       
if df_product_stock_info_packages.empty:
    print('\n\033[1mNo packages found for table product_stock_info!\033[0m\n')
else:    
    with pd.option_context('display.max_rows', None,
                        'display.max_columns', None,
                        'display.precision', 3,
                        ):
        print("Following packages were found for table product_stock_info:")
        display(df_product_stock_info_packages)

# Min/Max product stock check -> check if suspicious stock value was not loaded to database      
if  df_product_stock_info_max_min.empty:
    print(f"No min/max value was found for stocks\n")   
else:
    with pd.option_context('display.max_rows', None,
                        'display.max_columns', None,
                        'display.precision', 3,
                        ):
        print(f"Min/max stocks, for following load dates:\n{load_dates_intervals_str_string}\n")
        display(df_product_stock_info_max_min) 

# Region check        
if  df_product_stock_info_region.empty:
    print(f"No region was found for table: product_stock_info\n")   
else:
    with pd.option_context('display.max_rows', None,
                        'display.max_columns', None,
                        'display.precision', 3,
                        ):
        print(f"Following regions were found for table product_stock_info:\n")
        display(df_product_stock_info_region) 

print("""-------------------------------------------------------------------------------------""")

# Old prices load check -> 2 prices for 1 product (in following table there should be only latest price) -> table price_product_info_latest
if df_old_prices.empty:
    print(f"No old prices were found in table: price_product_info_latest\n")
else:
    print(f"There were found probable doubled loads in table price_product_info_latest \n there are 2 prices for 1 product due to different manufactures which are not synonimized")
    display(df_old_prices)

# 10 random products with suspicious MRV (Minimum Row Value) which need to be checked
print(f"\nNumber of products with suspicious MRV: {df_MRV.shape[0]}")
print('List of 10 products with suspicious MRV which needs to be checked:')  
display(random_rows_check(df_MRV))


## Figures

In [None]:
# List of unique workflow names for function suspicious_data -> number of rows in raw files
list_of_suspicious_workflow_names = []
for i in df_row_count.WORKFLOW_NAME:
    if i not in list_of_suspicious_workflow_names:
        list_of_suspicious_workflow_names.append(i)
    else:
        pass

In [None]:
# Comparison of table source_loading_info_init with number of rows in raw files
for name in list_of_suspicious_workflow_names:
    print(f"""Checking table source_loading_info_init and number of raw files rows (row_count) for \033[1mworkflow: {i}\033[0m""")
    suspicious_data(df_row_count.loc[df_row_count['WORKFLOW_NAME']==f'{name}'])


figure1 = px.line(df_row_count, 
              x="load_datetime", 
              y="amount", 
              color='WORKFLOW_NAME', 
              markers=True,
                    full_name = 'Sum of raw files rows per week and workflow',
                    labels={
                     "amount": "Number of rows",
                     "WORKFLOW_NAME": "Workflow name",
                     "weeks": "week-month-year"
                 },
              width=1500, 
              height=600)
figure1.show()

In [None]:
# Prices load check
print(f"Number of prices load per day check")
df_price_product_info_product_per_date_suspicious = df_price_product_info_product_per_date.rename(columns = {'load_datetime':'load_date', 'count':'amount'})
suspicious_data(df_price_product_info_product_per_date_suspicious)

df_price_product_info_product_per_date = df_price_product_info_product_per_date.rename(columns = {'load_datetime':'load_date', 'count':'number_of_loaded_prices'})

figure2 = px.line(df_price_product_info_product_per_date, 
              x="load_date", 
              y="number_of_loaded_prices", 
              full_name='Number of loads per day for prices',
              markers=True, 
              width=1500, 
              height=600,
              )

figure2.show()


In [None]:
# SPN check
print(f"SPN check")
suspicious_data(df_spn)

figure3 = px.line(df_spn.sort_values(by='load_datetime'), 
              x="load_datetime", 
              y="amount", 
              color='s_product_id', 
              markers=True,
                    full_name = f'Quantity and type of SPN for {SOURCE_NAME}',
                    labels={
                     "amount": "Number of rows",
                     "s_product_id": "SPN type"
                 },
              width=1500, 
              height=600)
figure3.show()

In [None]:
# Stock load
df_product_stock_info_product_per_date = df_product_stock_info_product_per_date.rename(columns = {'load_datetime':'load_date', 'count':'number_of_loaded_stocks'})

figure4 = px.line(df_product_stock_info_product_per_date, 
              x="Load_date", 
              y="Number_of_loaded_stocks", 
              full_name='Number_of_loaded_stocks_per_date', 
              markers=True, 
              width=1500, 
              height=600)

figure4.show()

In [None]:
# Amount and type of stock (stock>0 or stock=0) loaded
figure5 = px.line(df_product_stock_info_higher_than_zero, 
              x="load_datetime", 
              y="number_of_products", 
              color='stock_type', 
              full_name='Quantity and stock type loaded per day',
              markers=True, 
              width=1500, 
              height=600)

figure5.show()

In [None]:
# Number of loaded prices per date
number_of_products_per_load = df_price_latest[['load_datetime', 'product_id']].groupby(by='load_datetime', as_index=False).count()
number_of_products_per_load_figure = number_of_products_per_load.rename(columns = {'load_datetime':'load_date', 'product_id':'number_of_loaded_prices'})

figure6 = px.line(number_of_products_per_load_figure, 
              x="load_date", 
              y="number_of_loaded_prices", 
              full_name='Number of loaded prices per date, table: price_product_info_latest', 
              markers=True, 
              width=1500, 
              height=600)

figure6.show()

In [None]:
# Number of zero SPNs loaded
number_of_zero_spn = df_price_latest[df_price_latest['s_product_id'] == 0][['load_datetime', 's_product_id']].groupby(by='load_datetime', as_index=False).count()

figure7 = px.line(number_of_zero_spn, 
              x="load_datetime", 
              y="s_product_id", 
              full_name='Number of loads of zero SPNs, table: price_product_info_latest', 
              markers=True, 
              )

figure7.show()

In [None]:
print("Number of stocks loaded, based on table product_stock_info_latest\n")

number_of_products_per_load = df_stock_latest[['load_datetime', 'product_id']].groupby(by='load_datetime', as_index=False).count()
number_of_products_per_load_figure = number_of_products_per_load.rename(columns = {'load_datetime':'load_date', 'product_id':'number of loaded stocks'})

figure8 = px.line(number_of_products_per_load_figure, 
              x="load_date", 
              y="number of loaded stocks", 
              full_name='Number of stocks loaded per date, table product_stock_info_latest', 
              markers=True, 
              width=1500, 
              height=600)

figure8.show()

In [None]:
# Number of zero SPNs from product_stock_info_latest
number_of_zero_spn_stock_curr = df_stock_latest[df_stock_latest['s_product_id'] == 0][['load_datetime', 's_product_id']].groupby(by='load_datetime', as_index=False).count()

figure9 = px.line(number_of_zero_spn_stock_curr, 
              x="load_datetime", 
              y="s_product_id", 
              full_name='Number of zero SPNs from product_stock_info_latest', 
              markers=True, 
              )

figure9.show()

In [None]:
print("Check of table product_stock_info_latest\n")
try:
    if df_stock_latest.empty:
        print("Table product_stock_info_latest is empty.")
    else:       
        # product_stock_info_latest stats - min/max stock, regions, packages
        stock_max_stock_latest = int(df_stock_latest.stock.max())
        stock_min_stock_latest = int(df_stock_latest.stock.min())
        
        region_stock_latest = df_stock_latest.loc[:,['region_id']]
        region_stock_latest.drop_duplicates(inplace=True)
        region_stock_latest.sort_values(by='region_id', inplace=True)
        
        print("Regions found in table product_stock_info_latest:")
        display(region_stock_latest)
        
        print("Following packages were found for table product_stock_info_latest:")
        display(df_packages_product_stock_info_latest)
        
        print("\n\nMax stock found in product_stock_info_latest:", stock_max_stock_latest)
        print("Min stock found in product_stock_info_latest:", stock_min_stock_latest)

except Exception as e:
    print(str(e))
    

In [None]:
print("Check of table price_product_info_latest\n")
try:
    if df_price_latest.empty:
        print("Table price_product_info_latest is empty.")        
    else:  
        max_price_price_latest = float(df_price_latest.price.max())
        min_price_price_latest = float(df_price_latest.price.min())

        currency_region_price_latest = df_price_latest.loc[:, ['region_id', 'currency']]
        currency_region_price_latest.drop_duplicates(inplace=True)
        currency_region_price_latest.sort_values(by='region_id', inplace=True)

        print("Regions and currencies found for table: price_product_info_latest:")
        display(currency_region_price_latest)
        
        print("Packages found for: price_product_info_latest:")
        display(df_packages_price_product_info_latest)
        
        print("\n\nMax price found in table: price_product_info_latest:", max_price_price_latest)
        print("Min price found in table: price_product_info_latest:{:.10f}".format(min_price_price_latest))

except Exception as e:
    print(str(e))

In [None]:
# Number of suspicious stocks 
print('Number of suspicious stocks loaded:\n')
try:
    df_product_stock_info_product_per_date_suspicious = df_product_stock_info_product_per_date.rename(columns = {'load_datetime':'load_date', 'ilosc_zaladowanych_stockow':'amount'})
    suspicious_data(df_product_stock_info_product_per_date_suspicious)
    display(suspicious_data)

except Exception as e:
    print(str(e))

## Saving report to html file

In [None]:
from traitlets.config import Config
import nbformat as nbf
from nbconvert.exporters import HTMLExporter
from nbconvert.preprocessors import TagRemovePreprocessor

# Setup config
c = Config()

# Configure tag removal
c.TagRemovePreprocessor.remove_cell_tags = ("remove_cell",)
c.TagRemovePreprocessor.remove_all_outputs_tags = ('remove_output',)
c.TagRemovePreprocessor.remove_input_tags = ('remove_input',)
c.TagRemovePreprocessor.enabled = True

# Configure and run out exporter
c.HTMLExporter.preprocessors = ["nbconvert.preprocessors.TagRemovePreprocessor"]

exporter = HTMLExporter(config=c)
exporter.register_preprocessor(TagRemovePreprocessor(config=c),True)

# Configure and run our exporter - returns a tuple - first element with html,
# second with notebook metadata
output = HTMLExporter(config=c).from_filename("Source_validation.ipynb")

# Write to output html file
with open("source_validation_full.html",  "w", encoding="utf-8") as f:
    f.write(output[0])