In [26]:
import numpy as np
import pandas as pd
import logging
import os
import json
from dotenv import load_dotenv
from sqlalchemy import create_engine
import psycopg2
from psycopg2.extras import execute_batch, execute_values, DictCursor, Json

In [34]:
HOME = os.getcwd()

business = HOME+'/data/yelp_academic_dataset_business.json'
checkin = HOME+'/data/yelp_academic_dataset_checkin.json'
tip = HOME+'/data/yelp_academic_dataset_tip.json'
user = HOME+'/data/yelp_academic_dataset_user.json'
review = HOME+'/data/yelp_academic_dataset_review.json'

percipitation = HOME+'/data/USW00023169-LAS_VEGAS_MCCARRAN_INTL_AP-precipitation-inch.csv'
temperature = HOME+'/data/USW00023169-temperature-degreeF.csv'


In [3]:
load_dotenv(HOME + '/.env')

DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_USERNAME = os.getenv("DB_USERNAME")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_NAME = os.getenv("DB_NAME")

# BUSINESS

In [5]:
#connect to potsgresql - CREATE TABLE
try:
    conn = psycopg2.connect(f"host={DB_HOST} dbname={DB_NAME} user={DB_USERNAME} password={DB_USERNAME} port={DB_PORT}")
    print(f'Connection to {conn} Success')
except ValueError:
    print('Connection Failed')

cur  = conn.cursor()


#create table 
cur.execute("""
            CREATE TABLE IF NOT EXISTS raw.business_stg(
                business_id text,
                name text null,
                address text null,
                city text null,
                state text null,
                postal_code text null,
                latitide text null,
                longitude text null,
                stars text null,
                review_count text null,
                is_open text null,
                attributes text null,
                categories text null,
                hours text null)
"""
)
conn.commit()

Connection to <connection object at 0x11e625000; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success


In [6]:
with open(business, 'r') as f1:
    ll = [json.loads(line.strip()) for line in f1.readlines()]
    logging.info(f'Total Length Data : {len(ll)}')

chunks = 10000
total = len(ll) // chunks

for i in range(total+1):
    df = pd.DataFrame(ll[i * chunks:(i+1) * chunks]).astype('string')
    df = df.fillna('NA')
    tuples = [tuple(x) for x in df.to_numpy()]
    
    try:
        conn = psycopg2.connect(f"host={DB_HOST} dbname={DB_NAME} user={DB_USERNAME} password={DB_USERNAME} port={DB_PORT}")
        print(f'Connection to {conn} Success')
    except ValueError:
        print('Connection Failed')



    cursor = conn.cursor()
    try:
        # execute_batch(cursor, query, tuples)
        execute_batch(cursor, "INSERT INTO raw.business_stg VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", tuples)
        conn.commit()
        print("execute_batch() done")
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()

    
    cursor.close()

Connection to <connection object at 0x13e5f6b90; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection object at 0x11e625150; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection object at 0x13e5f6ce0; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection object at 0x11e6257e0; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection object at 0x11e625150; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection object at 0x11e625690; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection ob

# CHECKIN

In [7]:
#connect to potsgresql
try:
    conn = psycopg2.connect(f"host={DB_HOST} dbname={DB_NAME} user={DB_USERNAME} password={DB_USERNAME} port={DB_PORT}")
    print(f'Connection to {conn} Success')
except ValueError:
    print('Connection Failed')

cur  = conn.cursor()


#create table 
cur.execute("""
            CREATE TABLE IF NOT EXISTS raw.checkin_stg(
                business_id text null,
                date text null
            )
"""
)
conn.commit()

Connection to <connection object at 0x11e625150; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success


In [8]:
with open(checkin, 'r') as f1:
    ll = [json.loads(line.strip()) for line in f1.readlines()]
    print(len(ll))

    chunks = 10000
    total = len(ll) // chunks
    
    for i in range(total+1):
        # result.append(ll[i * chunks:(i+1) * chunks])
        # result.append(pd.DataFrame(ll[i * chunks:(i+1) * chunks]))

        df = pd.DataFrame(ll[i * chunks:(i+1) * chunks])
        tuples = [tuple(x) for x in df.to_numpy()]
        
        # cols = ','.join(list(df.columns))
        
        try:
            conn = psycopg2.connect(f"host={DB_HOST} dbname={DB_NAME} user={DB_USERNAME} password={DB_USERNAME} port={DB_PORT}")
            print(f'Connection to {conn} Success')
        except ValueError:
            print('Connection Failed')

    
        cursor = conn.cursor()
        try:
            # execute_batch(cursor, query, tuples)
            execute_batch(cursor, "INSERT INTO raw.checkin_stg VALUES(%s,%s)", tuples)
            conn.commit()
            print("execute_batch() done")
        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            cursor.close()

        cursor.close()


131930
Connection to <connection object at 0x11e625540; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection object at 0x11e625bd0; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection object at 0x11e625000; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection object at 0x11e625540; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection object at 0x11e625bd0; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection object at 0x11e625000; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connec

# TIP

In [9]:
#connect to potsgresql
try:
    conn = psycopg2.connect(f"host={DB_HOST} dbname={DB_NAME} user={DB_USERNAME} password={DB_USERNAME} port={DB_PORT}")
    print(f'Connection to {conn} Success')
except ValueError:
    print('Connection Failed')

cur  = conn.cursor()


#create table 
cur.execute("""
            CREATE TABLE IF NOT EXISTS raw.tip_stg(
                user_id text,
                business_id text null,
                text text null,
                date text null,
                compliment_count numeric null
            )
"""
)
conn.commit()

Connection to <connection object at 0x11e625540; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success


In [10]:
with open(tip, 'r') as f1:
    ll = [json.loads(line.strip()) for line in f1.readlines()]
    print(len(ll))

    chunks = 10000
    total = len(ll) // chunks
    
    for i in range(total+1):
        # result.append(ll[i * chunks:(i+1) * chunks])
        # result.append(pd.DataFrame(ll[i * chunks:(i+1) * chunks]))

        df = pd.DataFrame(ll[i * chunks:(i+1) * chunks]).astype('string')
        tuples = [tuple(x) for x in df.to_numpy()]
        
        # cols = ','.join(list(df.columns))
        
        try:
            conn = psycopg2.connect(f"host={DB_HOST} dbname={DB_NAME} user={DB_USERNAME} password={DB_USERNAME} port={DB_PORT}")
            print(f'Connection to {conn} Success')
        except ValueError:
            print('Connection Failed')

    
        cursor = conn.cursor()
        try:
            # execute_batch(cursor, query, tuples)
            execute_batch(cursor, "INSERT INTO raw.tip_stg VALUES(%s,%s,%s,%s,%s)", tuples)
            conn.commit()
        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            cursor.close()

        print("execute_batch() done")
        cursor.close()

        # cur.execute("INSERT INTO busines VALUES ( default, %s, %s, %s,%s) ON CONFLICT DO NOTHING", row)

908915
Connection to <connection object at 0x11e625150; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection object at 0x11e6257e0; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection object at 0x11e6252a0; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection object at 0x11e625150; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection object at 0x11e625930; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection object at 0x11e625690; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connec

# USER

In [4]:
#connect to potsgresql
try:
    conn = psycopg2.connect(f"host={DB_HOST} dbname={DB_NAME} user={DB_USERNAME} password={DB_USERNAME} port={DB_PORT}")
    print(f'Connection to {conn} Success')
except ValueError:
    print('Connection Failed')

cur  = conn.cursor()


#create table 
cur.execute("""
            CREATE TABLE IF NOT EXISTS raw.user_stg(
                user_id text,
                name text null,
                review_count numeric null,
                yelping_since text null,
                useful numeric null,
                funny numeric null,
                cool numeric null,
                elite text null,
                friends text null,
                fans numeric null,
                average_stars numeric null,
                compliment_hot numeric null,
                compliment_more numeric null,
                compliment_profile numeric null,
                compliment_cute numeric null,
                compliment_list numeric null,
                compliment_note numeric null,
                compliment_plain numeric null,
                compliment_cool numeric null,
                compliment_funny numeric null,
                compliment_writer numeric null,
                compliment_photos numeric null

            )
"""
)
conn.commit()

Connection to <connection object at 0x1111c1150; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success


In [5]:
with open(user, 'r') as f1:
    ll = [json.loads(line.strip()) for line in f1.readlines()]
    print(len(ll))

    chunks = 10000
    total = len(ll) // chunks
    
    for i in range(total+1):
        # result.append(ll[i * chunks:(i+1) * chunks])
        # result.append(pd.DataFrame(ll[i * chunks:(i+1) * chunks]))

        df = pd.DataFrame(ll[i * chunks:(i+1) * chunks])
        tuples = [tuple(x) for x in df.to_numpy()]
        
        # cols = ','.join(list(df.columns))
        
        try:
            conn = psycopg2.connect(f"host={DB_HOST} dbname={DB_NAME} user={DB_USERNAME} password={DB_USERNAME} port={DB_PORT}")
            print(f'Connection to {conn} Success')
        except ValueError:
            print('Connection Failed')

        # query  = "INSERT INTO %s(%s) VALUES(%%s,%%s,%%s,%%s,%%s)" % ('tip_stg', cols)
        # print(query)

    
        cursor = conn.cursor()
        try:
            # execute_batch(cursor, query, tuples)
            execute_batch(cursor, "INSERT INTO raw.user_stg VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", tuples)
            conn.commit()
        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            cursor.close()

        print("execute_batch() done")
        cursor.close()

1987897
Connection to <connection object at 0x565671fc0; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection object at 0x565672500; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection object at 0x565672260; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection object at 0x565672500; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection object at 0x5656727a0; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <connection object at 0x565672f80; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done
Connection to <conne

# REVIEW

In [6]:
#connect to potsgresql
try:
    conn = psycopg2.connect(f"host={DB_HOST} dbname={DB_NAME} user={DB_USERNAME} password={DB_USERNAME} port={DB_PORT}")
    print(f'Connection to {conn} Success')
except ValueError:
    print('Connection Failed')

cur  = conn.cursor()


#create table 
cur.execute("""
            CREATE TABLE IF NOT EXISTS raw.review_stg(
                review_id text,
                user_id text null,
                business_id text null,
                stars text null,
                usefull text null,
                funny numeric null,
                cool numeric null,
                text text null,
                date text null
            )
"""
)
conn.commit()

Connection to <connection object at 0x128079540; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success


In [7]:
iteration = 0
with open(review, 'r') as f1:
    ll = [json.loads(line.strip()) for line in f1.readlines()]
    print(len(ll))

    chunks = 100000
    total = len(ll) // chunks
    
    for i in range(total+1):
        # result.append(ll[i * chunks:(i+1) * chunks])
        # result.append(pd.DataFrame(ll[i * chunks:(i+1) * chunks]))

        df = pd.DataFrame(ll[i * chunks:(i+1) * chunks])
        tuples = [tuple(x) for x in df.to_numpy()]
        # print(tuples)
        
        cols = ','.join(list(df.columns))
        
        try:
            conn = psycopg2.connect(f"host={DB_HOST} dbname={DB_NAME} user={DB_USERNAME} password={DB_USERNAME} port={DB_PORT}")
            print(f'Connection to {conn} Success')
        except ValueError:
            print('Connection Failed')

    
        cursor = conn.cursor()
        iteration += 1
        try:
            # execute_batch(cursor, query, tuples)
            execute_batch(cursor, "INSERT INTO raw.review_stg VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)", tuples)
            conn.commit()
            print(f"execute_batch() done : iteration {iteration}")
        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            cursor.close()


        cursor.close()

6990280
Connection to <connection object at 0x128079690; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done : iteration 1
Connection to <connection object at 0x1280797e0; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done : iteration 2
Connection to <connection object at 0x128079d20; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done : iteration 3
Connection to <connection object at 0x128079bd0; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done : iteration 4
Connection to <connection object at 0x1280797e0; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 port=5432', closed: 0> Success
execute_batch() done : iteration 5
Connection to <connection object at 0x128079d20; dsn: 'user=airflow password=xxx dbname=postgres host=0.0.0.0 po

In [42]:
try:
    engine = create_engine(f"postgresql+psycopg2://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}")
    logging.info(f'Connection to {conn} Success')
except (Exception, psycopg2.DatabaseError) as error:
    logging.info(f"Error: {error}")
    conn.rollback()

In [39]:
try:
    engine = create_engine(f"postgresql+psycopg2://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}")
    print('Success')
except ValueError:
    print('failed to connect')

Success


In [41]:
df = pd.read_csv(percipitation)
df['precipitation'] = df['precipitation'].replace('T', None)
df.to_sql('percipitation_stg', engine, if_exists='replace', schema='raw')

241

## PERCIPITATION

In [43]:
try:
    engine = create_engine(f"postgresql+psycopg2://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}")
    df = pd.read_csv(percipitation)
    df['precipitation'] = df['precipitation'].replace('T', None)
    df.to_sql('percipitation_stg', engine, if_exists='replace', schema='raw', index=False)
    print(f'Success')
except ValueError:
    print('Failed')

Success


## TEMPERATURE

In [35]:
df = pd.read_csv(temperature)
df.head()

Unnamed: 0,date,min,max,normal_min,normal_max
0,19480906,105.0,61.0,98.7,76.1
1,19480907,104.0,60.0,98.4,75.8
2,19480908,103.0,64.0,98.0,75.4
3,19480909,106.0,62.0,97.7,75.1
4,19480910,106.0,61.0,97.3,74.7


In [36]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 28241 entries, 0 to 28240
Data columns (total 5 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   date        28241 non-null  int64  
 1   min         26588 non-null  float64
 2   max         26588 non-null  float64
 3   normal_min  28241 non-null  float64
 4   normal_max  28241 non-null  float64
dtypes: float64(4), int64(1)
memory usage: 1.1 MB


In [38]:
try:
    engine = create_engine(f"postgresql+psycopg2://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}")
    df = pd.read_csv(temperature)
    df.to_sql('temperature_stg', engine, if_exists='replace', schema='raw', index=False)
    print(f'Success')
except ValueError:
    print('Failed')

Success
