<a href="https://colab.research.google.com/github/Akshayabalaji23/-Dynamic-Data-Ingestion-and-Storage-in-HDFS-with-Automated-Hive-Integration/blob/main/%20Dynamic%20Data%20Ingestion%20and%20Storage%20in%20HDFS%20with%20Automated%20Hive%20Integration.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ===== User-configurable parameters =====
import os

# Pick a specific CSV from the Census site. Replace with the exact path you intend to use.
DATASET_URL = "https://www2.census.gov/programs-surveys/popest/datasets/2020/state/asrh/sc-est2020-alldata6.csv"

# Local filenames/paths
LOCAL_FILE = "census_data.csv"  # saved name after download

# HDFS locations
HDFS_BASE_DIR = "/user/$USER/census_data"  # uses your unix user automatically
HDFS_FILE_PATH = f"{HDFS_BASE_DIR}/{LOCAL_FILE}"

# Hive settings
HIVE_DB = "census_db"
HIVE_TABLE = "census_table"

# If your CSV delimiter is comma, keep as is. Otherwise change accordingly.
CSV_DELIMITER = ","

print("Configured:\n",
      f"DATASET_URL=\t{DATASET_URL}\n",
      f"LOCAL_FILE=\t{LOCAL_FILE}\n",
      f"HDFS_BASE_DIR=\t{HDFS_BASE_DIR}\n",
      f"HDFS_FILE_PATH=\t{HDFS_FILE_PATH}\n",
      f"HIVE_DB=\t\t{HIVE_DB}\n",
      f"HIVE_TABLE=\t{HIVE_TABLE}\n",
      f"CSV_DELIMITER=\t{CSV_DELIMITER}")


In [None]:
%%bash
set -euo pipefail
echo "Checking accessibility (HEAD request)..."
curl -I -L "$DATASET_URL" | head -n 20 || {
  echo "\n[ERROR] Could not access $DATASET_URL" >&2
  exit 1
}

In [None]:
%%bash
set -euo pipefail
echo "Downloading dataset to $LOCAL_FILE ..."
wget -O "$LOCAL_FILE" "$DATASET_URL"
echo "Downloaded: $(ls -lh "$LOCAL_FILE")"

In [None]:
import csv, re

def sniff_types(sample_rows, headers):
    def is_int(x):
        try:
            int(x)
            return True
        except:
            return False
    def is_float(x):
        try:
            float(x)
            return True
        except:
            return False
    types = []
    for col_idx, _ in enumerate(headers):
        col_vals = [r[col_idx] for r in sample_rows if len(r) > col_idx and r[col_idx] != ""]
        # prefer INT -> BIGINT, fallback to DOUBLE, else STRING
        if col_vals and all(is_int(v) for v in col_vals):
            types.append("BIGINT")
        elif col_vals and all(is_float(v) for v in col_vals):
            types.append("DOUBLE")
        else:
            types.append("STRING")
    return types

with open(LOCAL_FILE, newline='') as f:
    reader = csv.reader(f)
    header = next(reader)
    # take first 100 rows as sample
    sample = [next(reader) for _ in range(100)]

clean_headers = []
for h in header:
    h2 = re.sub(r"[^a-zA-Z0-9_]", "_", h.strip())
    if re.match(r"^[0-9]", h2):
        h2 = "c_" + h2
    if not h2:
        h2 = "col"
    clean_headers.append(h2.lower())

types = sniff_types(sample, header)
schema_pairs = [f"{c} {t}" for c, t in zip(clean_headers, types)]
ddl_cols = ",\n  ".join(schema_pairs)

create_stmt = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {HIVE_DB}.{HIVE_TABLE} (
  {ddl_cols}
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '{CSV_DELIMITER}'
STORED AS TEXTFILE
LOCATION '{HDFS_BASE_DIR}'
TBLPROPERTIES ("skip.header.line.count"="1");
""".strip()

print("\n---- Inferred Hive CREATE TABLE DDL ----\n")
print(create_stmt)


In [None]:
%%bash
set -euo pipefail
echo "Creating HDFS directory: $HDFS_BASE_DIR"
hadoop fs -mkdir -p "$HDFS_BASE_DIR"
echo "Uploading file to HDFS: $HDFS_FILE_PATH"
hadoop fs -put -f "$LOCAL_FILE" "$HDFS_FILE_PATH"
echo "Listing HDFS directory:"
hadoop fs -ls -h "$HDFS_BASE_DIR"

In [None]:
%%bash -s "$HIVE_DB" "$HIVE_TABLE"
set -euo pipefail
HIVE_DB="$1"; HIVE_TABLE="$2";
echo "Creating Hive database if not exists: $HIVE_DB"
hive -e "CREATE DATABASE IF NOT EXISTS ${HIVE_DB};"
echo "Dropping existing table (if any): ${HIVE_DB}.${HIVE_TABLE}"
hive -e "DROP TABLE IF EXISTS ${HIVE_DB}.${HIVE_TABLE};"

In [None]:
%%bash -s "$HDFS_BASE_DIR" "$CSV_DELIMITER" "$HIVE_DB" "$HIVE_TABLE"
set -euo pipefail
HDFS_BASE_DIR="$1"; CSV_DELIM="$2"; HIVE_DB="$3"; HIVE_TABLE="$4";

python - <<'PY'
import os
from pathlib import Path

# Load the previously generated DDL from Python memory by recomputing here (safe & deterministic)
import csv, re
LOCAL_FILE = os.environ.get('LOCAL_FILE', 'census_data.csv')
HIVE_DB = os.environ.get('HIVE_DB', 'census_db')
HIVE_TABLE = os.environ.get('HIVE_TABLE', 'census_table')
CSV_DELIMITER = os.environ.get('CSV_DELIMITER', ',')
HDFS_BASE_DIR = os.environ.get('HDFS_BASE_DIR', '/user/$USER/census_data')

def sniff_types(sample_rows, headers):
    def is_int(x):
        try:
            int(x); return True
        except: return False
    def is_float(x):
        try:
            float(x); return True
        except: return False
    types = []
    for col_idx, _ in enumerate(headers):
        col_vals = [r[col_idx] for r in sample_rows if len(r) > col_idx and r[col_idx] != ""]
        if col_vals and all(is_int(v) for v in col_vals):
            types.append("BIGINT")
        elif col_vals and all(is_float(v) for v in col_vals):
            types.append("DOUBLE")
        else:
            types.append("STRING")
    return types

with open(LOCAL_FILE, newline='') as f:
    reader = csv.reader(f)
    header = next(reader)
    sample = []
    for i, row in enumerate(reader):
        if i >= 100: break
        sample.append(row)

clean_headers = []
for h in header:
    h2 = re.sub(r"[^a-zA-Z0-9_]", "_", h.strip())
    if re.match(r"^[0-9]", h2): h2 = "c_" + h2
    if not h2: h2 = "col"
    clean_headers.append(h2.lower())

types = sniff_types(sample, header)
schema_pairs = [f"{c} {t}" for c, t in zip(clean_headers, types)]
ddl_cols = ",\n  ".join(schema_pairs)
create_stmt = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {HIVE_DB}.{HIVE_TABLE} (
  {ddl_cols}
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '{CSV_DELIMITER}'
STORED AS TEXTFILE
LOCATION '{HDFS_BASE_DIR}'
TBLPROPERTIES ("skip.header.line.count"="1");
""".strip()

# Save DDL to a temp SQL file for execution
Path("create_table.sql").write_text(create_stmt)
print("Saved Hive DDL to create_table.sql:\n", create_stmt)
PY

echo "Creating external table via Hive..."
hive -f create_table.sql
echo "Describe table:"
hive -e "USE ${HIVE_DB}; DESCRIBE EXTENDED ${HIVE_TABLE};"

In [None]:
%%bash -s "$HIVE_DB" "$HIVE_TABLE"
set -euo pipefail
HIVE_DB="$1"; HIVE_TABLE="$2";
echo "Row count:"
hive -e "SELECT COUNT(*) AS row_count FROM ${HIVE_DB}.${HIVE_TABLE};"

echo "\nPreview 10 rows:"
hive -e "SELECT * FROM ${HIVE_DB}.${HIVE_TABLE} LIMIT 10;"

In [None]:
%%bash -s "$HIVE_DB" "$HIVE_TABLE"
set -euo pipefail
HIVE_DB="$1"; HIVE_TABLE="$2";

echo "Top 10 names by total population estimate (if columns exist):"
hive -e "
USE ${HIVE_DB};
SET hive.mapred.mode=nonstrict;
SELECT name, SUM(COALESCE(popeSTIMATE2020, 0)) AS total_est_2020
FROM ${HIVE_TABLE}
GROUP BY name
ORDER BY total_est_2020 DESC
LIMIT 10;" || true

echo "\nAge distribution (if 'age' column exists):"
hive -e "
USE ${HIVE_DB};
SELECT age, COUNT(1) AS rows_per_age
FROM ${HIVE_TABLE}
GROUP BY age
ORDER BY age
LIMIT 20;" || true

In [None]:
%%bash
set -euo pipefail
cat > refresh_census.sh <<'EOF'
#!/usr/bin/env bash
set -euo pipefail

DATASET_URL="${DATASET_URL}"
LOCAL_FILE="${LOCAL_FILE}"
HDFS_BASE_DIR="${HDFS_BASE_DIR}"
HIVE_DB="${HIVE_DB}"
HIVE_TABLE="${HIVE_TABLE}"
CSV_DELIMITER="${CSV_DELIMITER}"

echo "[1/5] Downloading $DATASET_URL ..."
wget -O "$LOCAL_FILE" "$DATASET_URL"

echo "[2/5] Creating HDFS dir $HDFS_BASE_DIR ..."
hadoop fs -mkdir -p "$HDFS_BASE_DIR"

echo "[3/5] Uploading to HDFS ..."
hadoop fs -put -f "$LOCAL_FILE" "$HDFS_BASE_DIR/$LOCAL_FILE"

echo "[4/5] Building Hive DDL from CSV header ..."
python - <<'PY'
import os, csv, re
LOCAL_FILE = os.environ.get('LOCAL_FILE', 'census_data.csv')
HIVE_DB = os.environ.get('HIVE_DB', 'census_db')
HIVE_TABLE = os.environ.get('HIVE_TABLE', 'census_table')
CSV_DELIMITER = os.environ.get('CSV_DELIMITER', ',')
HDFS_BASE_DIR = os.environ.get('HDFS_BASE_DIR', '/user/$USER/census_data')
def sniff_types(sample_rows, headers):
    def is_int(x):
        try: int(x); return True
        except: return False
    def is_float(x):
        try: float(x); return True
        except: return False
    types = []
    for col_idx, _ in enumerate(headers):
        col_vals = [r[col_idx] for r in sample_rows if len(r) > col_idx and r[col_idx] != ""]
        if col_vals and all(is_int(v) for v in col_vals): types.append('BIGINT')
        elif col_vals and all(is_float(v) for v in col_vals): types.append('DOUBLE')
        else: types.append('STRING')
    return types
with open(LOCAL_FILE, newline='') as f:
    reader = csv.reader(f)
    header = next(reader)
    sample = [next(reader) for _ in range(100)]
clean_headers = []
for h in header:
    h2 = re.sub(r"[^a-zA-Z0-9_]", "_", h.strip())
    if re.match(r"^[0-9]", h2): h2 = "c_" + h2
    if not h2: h2 = "col"
    clean_headers.append(h2.lower())
types = sniff_types(sample, header)
ddl_cols = ",\n  ".join([f"{c} {t}" for c,t in zip(clean_headers, types)])
stmt = f"""
CREATE DATABASE IF NOT EXISTS {HIVE_DB};
DROP TABLE IF EXISTS {HIVE_DB}.{HIVE_TABLE};
CREATE EXTERNAL TABLE IF NOT EXISTS {HIVE_DB}.{HIVE_TABLE} (
  {ddl_cols}
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '{CSV_DELIMITER}'
STORED AS TEXTFILE
LOCATION '{HDFS_BASE_DIR}'
TBLPROPERTIES ("skip.header.line.count"="1");
"""
open('refresh_create.sql','w').write(stmt)
print(stmt)
PY

echo "[5/5] Applying Hive DDL ..."
hive -f refresh_create.sql
echo "Done."
EOF

chmod +x refresh_census.sh
echo "Created refresh_census.sh"
ls -lh refresh_census.sh

In [None]:
%%bash -s "$HIVE_DB" "$HIVE_TABLE"
set -euo pipefail
HIVE_DB="$1"; HIVE_TABLE="$2";
echo "Databases:"; hive -e "SHOW DATABASES;" | sed -n '1,50p'
echo "\nTables in ${HIVE_DB}:"; hive -e "USE ${HIVE_DB}; SHOW TABLES;"
echo "\nSample data:"; hive -e "SELECT * FROM ${HIVE_DB}.${HIVE_TABLE} LIMIT 5;"