# Detecting schema from CSV-file for ADE by using DuckDB

This Notebook can be used to create valid ADE entity import CSV:s from data files in CSV format.

## Initialization

In [22]:
import duckdb
import pandas as pd
import os

In [23]:
# Path for all CSV files
path = "airbnb/"
dir_list = os.listdir(path)
print(dir_list)

['reviews.csv', 'listings.csv']


## Creating DuckDB table from CSV with read_csv_auto

In [24]:
tables = []

for file_name in dir_list:
    file_format = file_name.split('.')[-1]
    
    if file_format == 'csv':
        table_name = file_name.split('.')[0]
        tables.append(table_name)
        # Use read_csv_auto(\'{1}\', types={{\'ounces\': DECIMAL}} to give column specific data type
        csv_sql = """
                CREATE OR REPLACE TABLE {0} AS 
                SELECT * FROM read_csv_auto(\'{1}\', header=True);
                """.format(table_name, path + file_name)
        print(csv_sql)
        duckdb.sql(csv_sql)


                CREATE OR REPLACE TABLE reviews AS 
                SELECT * FROM read_csv_auto('airbnb/reviews.csv', header=True);
                

                CREATE OR REPLACE TABLE listings AS 
                SELECT * FROM read_csv_auto('airbnb/listings.csv', header=True);
                


In [25]:
# Printing read tables and displaying schema in DuckDB
for table_name in tables:
    print(table_name)
    duckdb.sql("DESCRIBE {0};".format(table_name)).show()

reviews
┌───────────────┬─────────────┬─────────┬─────────┬─────────┬───────┐
│  column_name  │ column_type │  null   │   key   │ default │ extra │
│    varchar    │   varchar   │ varchar │ varchar │ varchar │ int32 │
├───────────────┼─────────────┼─────────┼─────────┼─────────┼───────┤
│ listing_id    │ BIGINT      │ YES     │ NULL    │ NULL    │  NULL │
│ id            │ BIGINT      │ YES     │ NULL    │ NULL    │  NULL │
│ date          │ DATE        │ YES     │ NULL    │ NULL    │  NULL │
│ reviewer_id   │ BIGINT      │ YES     │ NULL    │ NULL    │  NULL │
│ reviewer_name │ VARCHAR     │ YES     │ NULL    │ NULL    │  NULL │
│ comments      │ VARCHAR     │ YES     │ NULL    │ NULL    │  NULL │
└───────────────┴─────────────┴─────────┴─────────┴─────────┴───────┘

listings
┌──────────────────────────────────────────────┬─────────────┬─────────┬─────────┬─────────┬───────┐
│                 column_name                  │ column_type │  null   │   key   │ default │ extra │
│         

## Detecting varchar columns and max lengths
DuckDB does not have lengths for VARCHAR columns, so let's query the max lenght of each column

In [26]:
for table_name in tables:
    varchar_columns_df = duckdb.sql("""
                SELECT 
                    column_name, 
                    data_type 
                FROM INFORMATION_SCHEMA.COLUMNS 
                WHERE table_name = \'{0}\' AND data_type = \'VARCHAR\';""".format(table_name)).df()

    full_sql = ""
    for index, row in varchar_columns_df.iterrows():
        sql = """
                SELECT 
                    MAX(LENGTH({0})) AS column_length, 
                    \'{0}\' AS column_name 
                FROM {1}""".format(row['column_name'], table_name)
    
        # Adding union all to get one table with column lengths and names
        if index != varchar_columns_df.index[-1]:
            sql += '\n UNION ALL \n'
        full_sql += sql
    
    varchar_column_lengths_df = duckdb.query(full_sql).df()

    duckdb.sql("""
                CREATE OR REPLACE TABLE varchar_column_lengths_{0} AS 
                SELECT * 
                FROM varchar_column_lengths_df""".format(table_name))

In [27]:
for table_name in tables:
    varchar_table_name = "varchar_column_lengths_" + table_name
    print(varchar_table_name)
    duckdb.sql("SELECT * FROM {0};".format(varchar_table_name)).show()

varchar_column_lengths_reviews
┌───────────────┬───────────────┐
│ column_length │  column_name  │
│     int64     │    varchar    │
├───────────────┼───────────────┤
│            35 │ reviewer_name │
│          6164 │ comments      │
└───────────────┴───────────────┘

varchar_column_lengths_listings
┌───────────────┬──────────────────────────────┐
│ column_length │         column_name          │
│    double     │           varchar            │
├───────────────┼──────────────────────────────┤
│          47.0 │ listing_url                  │
│          15.0 │ source                       │
│          75.0 │ name                         │
│        1000.0 │ description                  │
│        1000.0 │ neighborhood_overview        │
│         126.0 │ picture_url                  │
│          43.0 │ host_url                     │
│          33.0 │ host_name                    │
│          39.0 │ host_location                │
│        4920.0 │ host_about                   │
│           

## Creating entity import CSV with entity import query

In [28]:
# Variables for entity import query
database_dbms_product = 'AGILE_DATA_ENGINE'
package_name = 'SANDBOX_PACKAGE'
entity_physical_type = 'METADATA_ONLY'
entity_schema = 'SRC'
entity_type = 'SOURCE'
entity_dv_source = 'SYSTEM'

In [29]:
df_all = pd.DataFrame()

for table_name in tables:
    sql_file = open("information_schema_query.sql", "r").read()
    
    # Replacing variables with actual values
    sql_file = sql_file.replace('<table_name>', table_name)
    sql_file = sql_file.replace('<datatype_dbms_product>', database_dbms_product)
    sql_file = sql_file.replace('<package_name>', package_name)
    sql_file = sql_file.replace('<entity_physical_type>', entity_physical_type)
    sql_file = sql_file.replace('<entity_schema>', entity_schema)
    sql_file = sql_file.replace('<entity_type>', entity_type)
    sql_file = sql_file.replace('<entity_dv_source>', entity_dv_source)
    sql_file = sql_file.replace('<varchar_column_lengths>', "varchar_column_lengths_" + table_name)


    entity_import_df = duckdb.query(sql_file).df()
    pd.options.display.float_format = '{:,.0f}'.format
    
    df_all = pd.concat([entity_import_df, df_all], axis=0, ignore_index=True)

# Creating CSV from DataFrame
df_all.to_csv('entity_import.csv', index=False, float_format="%.0f")

In [30]:
# Printout of DataFrame
df_all

Unnamed: 0,datatype_dbms_product,package_name,entity_physical_type,entity_schema,entity_name,entity_type,attribute_name,attribute_datatype,attribute_length,attribute_precision,attribute_scale,nullable,position
0,AGILE_DATA_ENGINE,SANDBOX_PACKAGE,METADATA_ONLY,SRC,listings,SOURCE,id,INTEGER8,,,,1,1
1,AGILE_DATA_ENGINE,SANDBOX_PACKAGE,METADATA_ONLY,SRC,listings,SOURCE,listing_url,VARCHAR,47,,,1,2
2,AGILE_DATA_ENGINE,SANDBOX_PACKAGE,METADATA_ONLY,SRC,listings,SOURCE,scrape_id,INTEGER8,,,,1,3
3,AGILE_DATA_ENGINE,SANDBOX_PACKAGE,METADATA_ONLY,SRC,listings,SOURCE,last_scraped,DATE,,,,1,4
4,AGILE_DATA_ENGINE,SANDBOX_PACKAGE,METADATA_ONLY,SRC,listings,SOURCE,source,VARCHAR,15,,,1,5
...,...,...,...,...,...,...,...,...,...,...,...,...,...
76,AGILE_DATA_ENGINE,SANDBOX_PACKAGE,METADATA_ONLY,SRC,reviews,SOURCE,id,INTEGER8,,,,1,2
77,AGILE_DATA_ENGINE,SANDBOX_PACKAGE,METADATA_ONLY,SRC,reviews,SOURCE,date,DATE,,,,1,3
78,AGILE_DATA_ENGINE,SANDBOX_PACKAGE,METADATA_ONLY,SRC,reviews,SOURCE,reviewer_id,INTEGER8,,,,1,4
79,AGILE_DATA_ENGINE,SANDBOX_PACKAGE,METADATA_ONLY,SRC,reviews,SOURCE,reviewer_name,VARCHAR,35,,,1,5
