In [18]:
import requests
import json
import pandas as pd
import itertools
import math
import sqlite3

In [19]:
url = 'https://dummyapi.io/data/v1/'
header = {'app-id': '628d03c25f1069a96e4846cd'}
limit = 50

In [20]:
# Fetching full data of posts and users
def get_posts_users(filename):

    id_list = []

    response_id = requests.get(url+filename, params={'page': 0 , 'limit': limit } , headers=header)

    data = response_id.text

    parse_json_0 = json.loads(data)

    pages = int(math.ceil( parse_json_0['total'] / parse_json_0['limit'] ))

    for i in range(pages):

        response_id = requests.get(url+filename, params={'page': i, 'limit': limit } , headers=header)

        data = response_id.text

        parse_json = json.loads(data)

        id_list.append( [d['id'] for d in parse_json['data'] ])

        merged_id_list = list(itertools.chain(*id_list))
    
    full_data=[]

    for j in merged_id_list:
        
        full_data_request = requests.get(url+filename+'/'+j , headers=header)
        
        full_text = full_data_request.text
        
        parse_json_full = json.loads(full_text)
        
        full_data.append(parse_json_full)
        
    parse_json_full_df = pd.DataFrame(full_data)
    
    return parse_json_full_df
        

In [None]:
posts = get_posts_users('post')
users = get_posts_users('user')

In [None]:
#Fetching Preview data of comments
def get_comments(filename):    
    id_list = []

    response_id = requests.get(url+filename, params={'page': 0 , 'limit' : limit} , headers=header)

    data = response_id.text

    parse_json_0 = json.loads(data)

    pages = int(math.ceil( parse_json_0['total'] / parse_json_0['limit'] ))

    for i in range(pages):

        response_id = requests.get(url+filename, params={'page': i , 'limit' : limit} , headers=header)

        data = response_id.text

        parse_json = json.loads(data)

        id_list.append( parse_json['data'] )

        merged_id_list = list(itertools.chain(*id_list))

    return pd.DataFrame(merged_id_list)

In [None]:
comments_df = get_comments('comment')

In [None]:
# Setting owner_id as a separate column in posts and comments dataframe
def get_owner_id(dataframe):
    dataframe['owner_id'] = dataframe.owner.apply(lambda x: x['id'])
    dataframe.drop("owner", axis=1, inplace=True)
    return dataframe

In [None]:
cleansed_posts = get_owner_id(posts)

In [None]:
cleansed_comments = get_owner_id(comments_df)

In [None]:
cleansed_users = pd.concat([users.drop(['location'], axis=1), users['location'].apply(pd.Series)], axis=1)

In [None]:
#Convert string columns to datetime in order to be ready for database insertion
def convert_str_to_date(dataframe):
    for date_column in list(dataframe.columns):
        if ('date' in date_column.lower()) == True:
            dataframe[date_column] = pd.to_datetime(dataframe[date_column])

In [None]:
convert_str_to_date(cleansed_users)
convert_str_to_date(cleansed_comments)
convert_str_to_date(cleansed_posts)

In [None]:
#Data Validation Checks
len(cleansed_users['id'].unique()) == len(cleansed_users['id'])

In [None]:
len(cleansed_users.drop('id',axis=1).drop_duplicates()) == len(cleansed_users)

In [None]:
list(cleansed_users['title'].unique())

In [None]:
list(cleansed_users['gender'].unique())

In [None]:
len(cleansed_users[cleansed_users['registerDate'] < cleansed_users['updatedDate']])

In [None]:
def id_uniqueness(dataframe, id_column):
     if (len(dataframe[id_column].unique()) == len(dataframe[id_column])) == True:
            print('id column is unique')
     else: print('not unique') 
    
def data_uniqueness(dataframe, id_column):
     if (len(dataframe.drop(id_column,axis=1).drop_duplicates()) == len(dataframe)) == True:
            print('dataset is unique')
     else: print('not unique') 
    
def check_not_null_columns(dataframe, column_names):
    validation_list = []
    column_names = [column_names]
    for column in column_names:
        validation_list.append(pd.Series([dataframe[column].isnull().any()]).unique()[0])
    if len(pd.Series(validation_list).unique()) == 1:
        print('no null values')
    else:
        print('the column includes null values')

def positive_values(dataframe, column_names):
    validation_list = []
    column_names = [column_names]
    for column in column_names:
        validation_list.append((dataframe[column] < 0).any())
    if len(pd.Series(validation_list).unique()) == 1:
        print('no negative values')
    else:
        print('negative values included')

In [None]:
#Creating Staging Layer of the data pipeline
import sqlite3

conn = sqlite3.connect('dummy_stg') 
c = conn.cursor()

In [None]:
print(pd.io.sql.get_schema(cleansed_users,name='stg_users',con=conn))

In [None]:
c.execute('''
          CREATE TABLE "stg_users" (
                  "id" TEXT,
                  "title" TEXT,
                  "firstName" TEXT,
                  "lastName" TEXT,
                  "picture" TEXT,
                  "gender" TEXT,
                  "email" TEXT,
                  "dateOfBirth" TIMESTAMP,
                  "phone" TEXT,
                  "registerDate" TIMESTAMP,
                  "updatedDate" TIMESTAMP,
                  "street" TEXT,
                  "city" TEXT,
                  "state" TEXT,
                  "country" TEXT,
                  "timezone" TEXT
                )
          ''')

In [None]:
cleansed_users.to_sql(name='stg_users', con=conn, if_exists='append', index=False)

In [None]:
print(pd.io.sql.get_schema(cleansed_posts,name='stg_posts',con=conn))

In [None]:
c.execute('''
          CREATE TABLE "stg_posts" (
              "id" TEXT,
              "image" TEXT,
              "likes" INTEGER,
              "link" TEXT,
              "tags" TEXT,
              "text" TEXT,
              "publishDate" TIMESTAMP,
              "owner_id" TEXT
            )
          ''')

In [None]:
cleansed_posts.dtypes
cleansed_posts['id'] = cleansed_posts['id'].astype('str')
cleansed_posts['image'] = cleansed_posts['image'].astype('str')
cleansed_posts['link'] = cleansed_posts['link'].astype('str')
cleansed_posts['tags'] = cleansed_posts['tags'].astype('str')
cleansed_posts['text'] = cleansed_posts['text'].astype('str')
cleansed_posts['owner_id'] = cleansed_posts['owner_id'].astype('str')

In [None]:
cleansed_posts.to_sql(name='stg_posts', con=conn, index=False, if_exists='append')

In [None]:
print(pd.io.sql.get_schema(cleansed_comments,name='stg_comments',con=conn))

In [None]:
c.execute('''
          CREATE TABLE "stg_comments" (
              "id" TEXT,
              "message" TEXT,
              "post" TEXT,
              "publishDate" TIMESTAMP,
              "owner_id" TEXT
)
          ''')

In [None]:
cleansed_comments.to_sql(name='stg_comments', con=conn, index=False, if_exists='append')