# Metadata Preprocessing

The script cleans and preprocesses posts metadata 

In [1]:
# Import
import os, duckdb
from pathlib import Path

In [2]:
# Configuration
DB_PATH         = "D:/db/meta.duckdb" 
INPUT_CSV       = "D:/metadata.csv"

# Select output directory according to whether it is a sample or the entire dataset
OUT_PARQUET_DIR = "D:/dataset/meta_clean"
BAD_ROWS_OUT    = "D:/dataset/bad_rows"
DEDUP_LOG_OUT   = "D:/dataset/dedup_log"
EXPORT_BAD_ROWS = True
EXPORT_DEDUP_LOG= True
SAMPLE_ROWS     = 0

os.makedirs(Path(DB_PATH).parent, exist_ok=True)
os.makedirs(OUT_PARQUET_DIR, exist_ok=True)
if EXPORT_BAD_ROWS: os.makedirs(BAD_ROWS_OUT, exist_ok=True)
if EXPORT_DEDUP_LOG: os.makedirs(DEDUP_LOG_OUT, exist_ok=True)

# Start connection
con = duckdb.connect(DB_PATH)

con.execute("PRAGMA threads=2;") # less fragmentation
con.execute("SET memory_limit='5GB';") # below RAM
con.execute("SET preserve_insertion_order=false;") # less overhead

print("\n Set up ready")


 Set up ready


In [40]:
tables = con.sql("""SELECT * FROM information_schema.tables;
""").fetchdf()

print(tables)

  table_catalog table_schema       table_name  table_type  \
0          meta         main  influencers_dim  BASE TABLE   
1          meta         main  influencers_raw  BASE TABLE   
2          meta         main    v_influencers        VIEW   

  self_referencing_column_name reference_generation user_defined_type_catalog  \
0                         None                 None                      None   
1                         None                 None                      None   
2                         None                 None                      None   

  user_defined_type_schema user_defined_type_name is_insertable_into is_typed  \
0                     None                   None                YES       NO   
1                     None                   None                YES       NO   
2                     None                   None                 NO       NO   

  commit_action TABLE_COMMENT  
0          None          None  
1          None          None  
2        

In [6]:
# 1) Import data from the CSV and set up the variables
limit_clause = f"LIMIT {SAMPLE_ROWS}" if SAMPLE_ROWS and SAMPLE_ROWS > 0 else ""
con.sql(f"""
CREATE OR REPLACE VIEW v_metadata_raw AS
SELECT * FROM read_csv_auto('{INPUT_CSV.replace("'", "''")}',
    header=true,
    all_varchar=true,
    sample_size=-1,            -- considera tutto per sniffing basico di delimitatori/quote
    ignore_errors=true         -- tollera eventuali record malformati
) {limit_clause};
""")
print("\n Step 1 completed")


 Step 1 completed


In [5]:
# Count initial number of rows
print(con.sql("""
SELECT COUNT(*) AS initial_nposts
FROM v_metadata_raw"""))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ initial_nposts ‚îÇ
‚îÇ     int64      ‚îÇ
‚îú‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î§
‚îÇ        9726198 ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò



In [10]:
# Show the imported fields
print(con.sql("""PRAGMA table_info(v_metadata_raw)""").fetchdf())

    cid                   name     type  notnull dflt_value     pk
0     0               filename  VARCHAR    False       None  False
1     1               username  VARCHAR    False       None  False
2     2             is_private  VARCHAR    False       None  False
3     3             like_count  VARCHAR    False       None  False
4     4                  width  VARCHAR    False       None  False
5     5                 height  VARCHAR    False       None  False
6     6               location  VARCHAR    False       None  False
7     7                  is_ad  VARCHAR    False       None  False
8     8  automated_description  VARCHAR    False       None  False
9     9                   time  VARCHAR    False       None  False
10   10                caption  VARCHAR    False       None  False
11   11          comment_count  VARCHAR    False       None  False
12   12               is_video  VARCHAR    False       None  False
13   13                    url  VARCHAR    False       None  F

In [9]:
# Check for duplicates in the filename: NO DUPLICATES
print(con.sql("""
SELECT filename, COUNT(*) AS n_duplicates
FROM v_metadata_raw
GROUP BY filename
HAVING COUNT(*) > 1
"""))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ filename ‚îÇ n_duplicates ‚îÇ
‚îÇ varchar  ‚îÇ    int64     ‚îÇ
‚îú‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î§
‚îÇ         0 rows          ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò



In [7]:
# Check for missing values in each field
print(con.sql("""SELECT
    SUM(CASE WHEN filename IS NULL THEN 1 ELSE 0 END) AS filename_missing,
    SUM(CASE WHEN username IS NULL THEN 1 ELSE 0 END) AS username_missing,
    SUM(CASE WHEN is_private IS NULL THEN 1 ELSE 0 END) AS is_private_missing,
    SUM(CASE WHEN like_count IS NULL THEN 1 ELSE 0 END) AS like_count_missing,
    SUM(CASE WHEN width IS NULL THEN 1 ELSE 0 END) AS width_missing,
    SUM(CASE WHEN height IS NULL THEN 1 ELSE 0 END) AS height_missing,
    SUM(CASE WHEN location IS NULL THEN 1 ELSE 0 END) AS location_missing,
    SUM(CASE WHEN is_ad IS NULL THEN 1 ELSE 0 END) AS is_ad_missing,
    SUM(CASE WHEN automated_description IS NULL THEN 1 ELSE 0 END) AS automated_description_missing,
    SUM(CASE WHEN time IS NULL THEN 1 ELSE 0 END) AS time_missing,
    SUM(CASE WHEN caption IS NULL THEN 1 ELSE 0 END) AS caption_missing,
    SUM(CASE WHEN comment_count IS NULL THEN 1 ELSE 0 END) AS comment_count_missing,
    SUM(CASE WHEN is_video IS NULL THEN 1 ELSE 0 END) AS is_video_missing,
    SUM(CASE WHEN url IS NULL THEN 1 ELSE 0 END) AS url_missing,
    COUNT(*) AS total_rows
FROM v_metadata_raw;
""").fetchdf())

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

   filename_missing  username_missing  is_private_missing  like_count_missing  \
0               0.0               0.0                 0.0                 0.0   

   width_missing  height_missing  location_missing  is_ad_missing  \
0            0.0             0.0         4928270.0            0.0   

   automated_description_missing  time_missing  caption_missing  \
0                      1128990.0           0.0          87922.0   

   comment_count_missing  is_video_missing  url_missing  total_rows  
0               973320.0               0.0          0.0     9726198  


In [9]:
# Check that NULL caption is the same as the empty caption
print(con.sql("""SELECT COUNT(*) AS caption_empty
FROM v_metadata_raw
WHERE caption = ''
"""))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ caption_empty ‚îÇ
‚îÇ     int64     ‚îÇ
‚îú‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î§
‚îÇ             0 ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò



In [10]:
# Check if there are posts with 0 comments
print(con.sql("""SELECT COUNT(*) AS zero_comments
FROM v_metadata_raw
WHERE comment_count = 0
"""))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ zero_comments ‚îÇ
‚îÇ     int64     ‚îÇ
‚îú‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î§
‚îÇ        691536 ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò



In [10]:
# Check if cases where comment_count IS NULL are collection error, or posts with low interaction
print(con.sql("""
SELECT filename, like_count, comment_count, caption
FROM v_metadata_raw 
WHERE comment_count IS NULL AND CAST(like_count AS DOUBLE) > 200000
LIMIT 10"""))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚

In [6]:
# Check True ads
print(con.sql("""
SELECT COUNT(*) AS n_ads
FROM v_metadata_raw
WHERE is_ad = 'True' 
"""))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ n_ads ‚îÇ
‚îÇ int64 ‚îÇ
‚îú‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î§
‚îÇ     0 ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò



In [13]:
# Check True ads
print(con.sql("""
SELECT COUNT(*) AS n_ads
FROM v_metadata_raw
WHERE is_ad = 'False' 
"""))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ  n_ads  ‚îÇ
‚îÇ  int64  ‚îÇ
‚îú‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î§
‚îÇ 9726198 ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò



In [11]:
# Check True videos
print(con.sql("""
SELECT COUNT(*) AS n_videos
FROM v_metadata_raw
WHERE is_video = 'True' 
"""))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ n_videos ‚îÇ
‚îÇ  int64   ‚îÇ
‚îú‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î§
‚îÇ        0 ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò



In [14]:
# Check True videos
print(con.sql("""
SELECT COUNT(*) AS n_videos
FROM v_metadata_raw
WHERE is_video = 'False' 
"""))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ n_videos ‚îÇ
‚îÇ  int64   ‚îÇ
‚îú‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î§
‚îÇ  9726198 ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò



In [8]:
# Check True private posts
print(con.sql("""
SELECT COUNT(*) AS n_private
FROM v_metadata_raw
WHERE is_private = 'True' 
"""))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ n_private ‚îÇ
‚îÇ   int64   ‚îÇ
‚îú‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î§
‚îÇ         0 ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò



In [17]:
# Check True private posts
print(con.sql("""
SELECT COUNT(*) AS n_private
FROM v_metadata_raw
WHERE is_private = 'False' 
"""))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ n_private ‚îÇ
‚îÇ   int64   ‚îÇ
‚îú‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î§
‚îÇ   9726198 ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò



In [27]:
# Check posts with negative height (0 posts ok)
print(con.sql("""
SELECT width, height
FROM v_metadata_raw
WHERE CAST(height AS DOUBLE) <= 0
LIMIT 10"""))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ  width  ‚îÇ height  ‚îÇ
‚îÇ varchar ‚îÇ varchar ‚îÇ
‚îú‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î§
‚îÇ      0 rows       ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò



In [28]:
# Check posts with negative width (0 posts ok)
print(con.sql("""
SELECT width, height
FROM v_metadata_raw
WHERE CAST(width AS DOUBLE) <= 0
LIMIT 10"""))

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ  width  ‚îÇ height  ‚îÇ
‚îÇ varchar ‚îÇ varchar ‚îÇ
‚îú‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î§
‚îÇ      0 rows       ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò



In [19]:
# All fields were imported as VARCHAR
con.sql("DESCRIBE v_metadata_raw").df()

Unnamed: 0,column_name,column_type,null,key,default,extra
0,filename,VARCHAR,YES,,,
1,username,VARCHAR,YES,,,
2,is_private,VARCHAR,YES,,,
3,like_count,VARCHAR,YES,,,
4,width,VARCHAR,YES,,,
5,height,VARCHAR,YES,,,
6,location,VARCHAR,YES,,,
7,is_ad,VARCHAR,YES,,,
8,automated_description,VARCHAR,YES,,,
9,time,VARCHAR,YES,,,


In [5]:
# 2) Variable standardization, normalization 
# All columns have been imported as varchar, the type cast is needed for the numerical ones
# - type cast to correct value type
# - comment_count missing values 
# - exclude location, automated_description, url
con.sql(r"""
CREATE OR REPLACE VIEW v_metadata_norm AS

WITH normalized AS (
SELECT
TRIM(filename) AS filename,  
LOWER(TRIM(REGEXP_REPLACE(username, '^@', ''))) AS username, 
TRY_CAST(like_count AS INTEGER) AS like_count,
TRY_CAST(width AS INTEGER) AS width,
TRY_CAST(height AS INTEGER) AS height,
time AS time_txt,
caption AS caption,
TRY_CAST(comment_count AS INTEGER) AS comment_count_raw
FROM v_metadata_raw
),

typed AS (
SELECT
*,
TRY_CAST(time_txt AS TIMESTAMP) AS time_utc, 
GREATEST(COALESCE(comment_count_raw,0), 0)  AS comment_count,   
FROM normalized
)

SELECT
    filename, username,
    like_count, comment_count,
    width, height,
    time_utc, caption
FROM typed;
""")
print("\n Step 2 completed")

# 3) Feature engineering 
# - create additional features related to the image derived from the width and height: aspect ration, area and orientation
# - extract date_day, year, month, day of the week, and hour
# - caption: length of the caption (number of characters), presence of caption

con.sql("""
CREATE OR REPLACE VIEW v_metadata_1 AS
SELECT *,
-- image features
    CASE WHEN width  > 0 AND height > 0 THEN ROUND(width::DOUBLE / height, 2) ELSE NULL END AS aspect_ratio,
    CASE WHEN width  > 0 AND height > 0 THEN width*height ELSE NULL END AS area,
    CASE WHEN width IS NOT NULL AND height IS NOT NULL THEN
        CASE WHEN width>height THEN 'landscape'
        WHEN width=height THEN 'square'
        ELSE 'portrait' END
    ELSE NULL END AS orientation,
-- time features
    CAST(time_utc AS DATE)              AS date_day,         
    EXTRACT(year  FROM time_utc)        AS year,
    EXTRACT(month FROM time_utc)        AS month,
    EXTRACT(dow   FROM time_utc)        AS dow,              
    EXTRACT(hour  FROM time_utc)        AS hour_utc,
-- text features 
    (caption IS NOT NULL)               AS has_caption,
    LENGTH(caption)                     AS caption_len_char
FROM v_metadata_norm;
""")
print("\n Step 3 completed")


 Step 2 completed

 Step 3 completed


In [7]:
# STEP 4 ‚Äî Export from v_metadata (there are no duplicates), partitioned by year/month
print("Start step 4 - exporting by year and month")
con.execute(f"""
COPY (SELECT * FROM v_metadata_1)
TO '{OUT_PARQUET_DIR.replace("'", "''")}'
(FORMAT PARQUET, PARTITION_BY (year, month), COMPRESSION SNAPPY);
""")

print("Step 4 completed")

# Check rows in the final file = rows in the view
print("\n[QC] Rows in Parquet vs view:")
print(con.execute(f"""
SELECT
  (SELECT COUNT(*) FROM read_parquet('{OUT_PARQUET_DIR.replace("'", "''")}/**/*.parquet', hive_partitioning=1)) AS rows_in_parquet,
  (SELECT COUNT(*) FROM v_metadata_1) AS rows_in_view
""").fetchdf())

# Check distribution of posts per year/month
print("\n[QC] Distribution per year/month:")
print(con.execute(f"""
SELECT year, month, COUNT(*) AS n
FROM read_parquet('{OUT_PARQUET_DIR.replace("'", "''")}/**/*.parquet', hive_partitioning=1)
GROUP BY 1,2
ORDER BY 1,2
""").fetchdf().head(20)) 


Start step 4 - exporting by year and month


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Step 4 completed

[QC] Rows in Parquet vs view:


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

   rows_in_parquet  rows_in_view
0          9726198       9726198

[QC] Distribution per year/month:
    year  month     n
0   2012      2    10
1   2012      3     6
2   2012      4    14
3   2012      5    27
4   2012      6   135
5   2012      7   144
6   2012      8   202
7   2012      9   263
8   2012     10   273
9   2012     11   243
10  2012     12   331
11  2013      1   400
12  2013      2   409
13  2013      3   641
14  2013      4   543
15  2013      5   654
16  2013      6   722
17  2013      7  1009
18  2013      8  1006
19  2013      9  1077


In [9]:
print("\n Preprocessing completed.")


 Preprocessing completed.


In [5]:
# CHECKS

# Show row counts: raw, valid, and bad rows
print("\n[QC] Row counts (bronze / valid)")
print(con.sql("SELECT COUNT(*) AS total_rows FROM v_metadata_raw;").fetchdf())
print(con.sql("SELECT COUNT(*) AS valid_rows  FROM v_metadata_1;").fetchdf())

# Show the columns
print(con.sql("""PRAGMA table_info(v_metadata_1)""").fetchdf())


[QC] Row counts (bronze / valid)


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

   total_rows
0     9726198


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

   valid_rows
0     9726198
    cid              name       type  notnull dflt_value     pk
0     0          filename    VARCHAR    False       None  False
1     1          username    VARCHAR    False       None  False
2     2        like_count    INTEGER    False       None  False
3     3     comment_count    INTEGER    False       None  False
4     4             width    INTEGER    False       None  False
5     5            height    INTEGER    False       None  False
6     6          time_utc  TIMESTAMP    False       None  False
7     7           caption    VARCHAR    False       None  False
8     8      aspect_ratio     DOUBLE    False       None  False
9     9              area    INTEGER    False       None  False
10   10       orientation    VARCHAR    False       None  False
11   11          date_day       DATE    False       None  False
12   12              year     BIGINT    False       None  False
13   13             month     BIGINT    False       None  False
14   14     

In [11]:
# Check again for missing values after preprocessing
print(con.sql("""SELECT
    SUM(CASE WHEN filename IS NULL THEN 1 ELSE 0 END) AS filename_missing,
    SUM(CASE WHEN username IS NULL THEN 1 ELSE 0 END) AS username_missing,
    SUM(CASE WHEN like_count IS NULL THEN 1 ELSE 0 END) AS like_count_missing,
    SUM(CASE WHEN width IS NULL THEN 1 ELSE 0 END) AS width_missing,
    SUM(CASE WHEN height IS NULL THEN 1 ELSE 0 END) AS height_missing,
    SUM(CASE WHEN time_utc IS NULL THEN 1 ELSE 0 END) AS time_missing,
    SUM(CASE WHEN caption IS NULL THEN 1 ELSE 0 END) AS caption_missing,
    SUM(CASE WHEN comment_count IS NULL THEN 1 ELSE 0 END) AS comment_count_missing,
    SUM(CASE WHEN aspect_ratio IS NULL THEN 1 ELSE 0 END) AS aspect_ratio_missing,
    SUM(CASE WHEN area IS NULL THEN 1 ELSE 0 END) AS area_missing,
    SUM(CASE WHEN orientation IS NULL THEN 1 ELSE 0 END) AS orientation_missing,
    SUM(CASE WHEN date_day IS NULL THEN 1 ELSE 0 END) AS date_day_missing,
    SUM(CASE WHEN year IS NULL THEN 1 ELSE 0 END) AS year_missing,
    SUM(CASE WHEN month IS NULL THEN 1 ELSE 0 END) AS month_missing,
    SUM(CASE WHEN dow IS NULL THEN 1 ELSE 0 END) AS dow_missing,
    SUM(CASE WHEN hour_utc IS NULL THEN 1 ELSE 0 END) AS hour_missing,
    SUM(CASE WHEN has_caption IS NULL THEN 1 ELSE 0 END) AS has_caption_missing,
    SUM(CASE WHEN caption_len_char IS NULL THEN 1 ELSE 0 END) AS caption_len_char_missing,
    COUNT(*) AS total_rows
FROM v_metadata_1;
""").fetchdf())


# Show like/comment distribution
print("\n Like/Comment distribution")
print(con.sql(r"""
SELECT
  COUNT(*) AS n,
  MIN(like_count) AS min_like, MAX(like_count) AS max_like,
  MIN(comment_count) AS min_com,  MAX(comment_count) AS max_com,
  AVG(like_count) AS avg_like, AVG(comment_count) AS avg_com,
  MEDIAN(like_count) AS p50_like, MEDIAN(comment_count) AS p50_com
FROM v_metadata_1;
""").fetchdf())

# Show row distribution by year/month
print("\n Rows by year/month")
print(con.sql(r"""
SELECT year, month, COUNT(*) AS n
FROM v_metadata_1
GROUP BY 1,2
ORDER BY 1,2;
""").fetchdf())

# Show how many rows were actually written to Parquet files on disk
print("\n Rows in Parquet")
print(con.sql(f"""
SELECT COUNT(*) AS rows_in_parquet
FROM read_parquet('{OUT_PARQUET_DIR.replace("'", "''")}/**/*.parquet', hive_partitioning=1)
""").fetchdf())

# 7) Consistency check: Parquet row count vs. light view row count
print("\n Consistency: Parquet vs. light view row counts")
print(con.sql(f"""
SELECT 
  (SELECT COUNT(*) FROM v_metadata_1) AS rows_in_view,
  (SELECT COUNT(*) FROM read_parquet('{OUT_PARQUET_DIR.replace("'", "''")}/**/*.parquet', hive_partitioning=1)) AS rows_in_parquet
""").fetchdf())

# Show a 5-row preview read back from Parquet
print("\n[QC] Parquet preview (5 rows)")
print(con.sql(f"""
SELECT *
FROM read_parquet('{OUT_PARQUET_DIR.replace("'", "''")}/**/*.parquet', hive_partitioning=1)
LIMIT 5;
""").fetchdf())

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

   filename_missing  username_missing  like_count_missing  width_missing  \
0               0.0               0.0                 0.0            0.0   

   height_missing  time_missing  caption_missing  comment_count_missing  \
0             0.0           0.0          87922.0                    0.0   

   aspect_ratio_missing  area_missing  orientation_missing  date_day_missing  \
0                   0.0           0.0                  0.0               0.0   

   year_missing  month_missing  dow_missing  hour_missing  \
0           0.0            0.0          0.0           0.0   

   has_caption_missing  caption_len_char_missing  total_rows  
0                  0.0                   87922.0     9726198  

 Like/Comment distribution


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

         n  min_like  max_like  min_com  max_com     avg_like    avg_com  \
0  9726198         0   8444365        0   772581  4301.705051  62.159108   

   p50_like  p50_com  
0     459.0     11.0  

 Rows by year/month


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

    year  month       n
0   2012      2      10
1   2012      3       6
2   2012      4      14
3   2012      5      27
4   2012      6     135
..   ...    ...     ...
83  2019      1  545850
84  2019      2  529088
85  2019      3  558020
86  2019      4  316140
87  2019      5   43670

[88 rows x 3 columns]

 Rows in Parquet
   rows_in_parquet
0          9726198

 Consistency: Parquet vs. light view row counts


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

   rows_in_view  rows_in_parquet
0       9726198          9726198

[QC] Parquet preview (5 rows)
                              filename     username  like_count  \
0   abbeyhagan-311629792938246883.info   abbeyhagan          14   
1  alexbuckham-292296021766594420.info  alexbuckham          39   
2  alexbuckham-292409526477938189.info  alexbuckham          10   
3  alexbuckham-296642007141998055.info  alexbuckham          14   
4  alexbuckham-298317228743875883.info  alexbuckham          31   

   comment_count  width  height            time_utc  \
0              0    612     612 2012-10-27 22:19:50   
1              0    612     612 2012-10-01 06:07:02   
2              4    612     612 2012-10-01 09:52:33   
3              0    612     612 2012-10-07 06:01:44   
4              1    612     612 2012-10-09 13:30:06   

                                             caption  aspect_ratio    area  \
0                           My princess got a bath üíú           1.0  374544   
1  Subway 

In [26]:
con.close()
print("Connection closed")

Connection closed
