# Data Wharehouse Course- ETL Task

###### Naom Tor, Tomer Yanay & Yogev Matalon, 7.6.2017

# 1. Creating the Scheme in the Local Host

### Create MYSQL connector

In [1]:
import MySQLdb as mdb

In [2]:
# creating connection
try:
    con = mdb.connect(
                   host = '127.0.0.1', user = 'root', passwd = 'root', use_unicode=True, charset="utf8")
    print "The password is `root`"
except:
    print "The password in not `root`. trying `1234`"
    con = mdb.connect(
                   host = '127.0.0.1', user = 'root', passwd = '1234', use_unicode=True, charset="utf8")

The password in not `root`. trying `1234`


In [3]:
# setting a cursor
cursor = con.cursor()     # get the cursor

### DDL - Create Schema

In [4]:
cursor.execute("DROP SCHEMA IF EXISTS Yelp_Pittsburgh ;")
cursor.execute("CREATE SCHEMA IF NOT EXISTS Yelp_Pittsburgh ;")
cursor.execute("USE Yelp_Pittsburgh ;")
cursor.execute('SET SQL_MODE=ANSI_QUOTES') #important for petl operation - use this kind of quotes

0L

### Create Tables

In [5]:
#dim_users table creation
cursor.execute("""
    CREATE TABLE IF NOT EXISTS dim_users (
        `user_id` INT(11) NOT NULL AUTO_INCREMENT,
        `yelp_user_id` VARCHAR(20) NOT NULL,
        `user_review_count` INT(11),
        `fans` INT(11),
        `is_elite` BOOLEAN NOT NULL,
        `yelping_since` DATE,
        `friends_count` INT(11) NOT NULL,
        `average_stars` FLOAT(11),
        PRIMARY KEY (`user_id`));
        """)

0L

In [6]:
#dim_business table creation
cursor.execute("""
    CREATE TABLE IF NOT EXISTS dim_business (       
        `business_id` INT(11) NOT NULL AUTO_INCREMENT,
        `Yelp_business_id` VARCHAR(20) NOT NULL,
        `business_name` VARCHAR(20) NOT NULL,
        `is_open` BOOLEAN NOT NULL,
        `neighborhood` VARCHAR(20),
        `latitude` FLOAT(11),
        `longitude` FLOAT(11),
        `business_stars` FLOAT(11),
        `business_review_count` INT(11) NOT NULL,
        `food` BOOLEAN NOT NULL,
        `art & enteraitment` BOOLEAN NOT NULL,
        `stores` BOOLEAN NOT NULL,
        `beauty & spa` BOOLEAN NOT NULL,
        `health` BOOLEAN NOT NULL,
        `finance` BOOLEAN NOT NULL,
        `turists` BOOLEAN NOT NULL,
        `cars & transportation` BOOLEAN NOT NULL,
        `bars & alcohol` BOOLEAN NOT NULL,
        `other` BOOLEAN NOT NULL,
        `fashion` BOOLEAN NOT NULL,
        `real estate` BOOLEAN NOT NULL,
        
        PRIMARY KEY (`business_id`));
        """)

0L

In [7]:
#dim_dates table creation
cursor.execute("""
    CREATE TABLE IF NOT EXISTS dim_dates (       
        `date_id` INT(11) NOT NULL AUTO_INCREMENT,
        `date` DATE NOT NULL,
        `day` INT(11) NOT NULL,
        `month` INT(11) NOT NULL,
        `year` INT(11),
        `day_name` VARCHAR(45) NOT NULL,
        `holiday` VARCHAR(45),
        PRIMARY KEY (`date_id`));
        """)

0L

In [8]:
#fact_review table creation
cursor.execute("""
    CREATE TABLE IF NOT EXISTS fact_reviews (
        `review_id` INT(11) NOT NULL AUTO_INCREMENT,
        `yelp_review_id` VARCHAR(20) NOT NULL,
        `user_id` INT(11) NOT NULL,
        `business_id` INT(11) NOT NULL,
        `date_id` int(11) NOT NULL,
        `review_stars` FLOAT(11),
        `votes_aggregate` INT(11),
        PRIMARY KEY (`review_id`));
        """)

0L

In [9]:
#commit changes
con.commit()

### Closing the connection

In [10]:
#closing the connection
cursor.close()
con.close()

# 2. ETL

In [11]:
import petl as etl
import datetime

## Users

### Extract

In [12]:
# Extract JSON file and display
filename= 'yelp_academic_dataset_users_nofriendlist_PA.json'
t1 = etl.fromjson(filename)
#t1.display(10) #Checking

In [13]:
'''creating the users dimension:
user_id= varchar(20)
friends_count= INT
user_review_count= INT
fans= INT
is_elite= Binary
yelping_since= Date
average_stars = FLOAT'''

t2 = t1.cut(['user_id','review_count','fans','elite','yelping_since','average_stars'])
#t2.display(10) #Checking

#### Transform

In [14]:
#Check the file names and types - for inner testings
#fields = t2.fieldnames()
#for f in fields:
#    print f,'\t', t2.typecounter(f)

In [15]:
#convert unicode type to binary (creating is_elite field)
def to_binary(text):
    if text[3:7] == 'None':
        return 0 
    return 1

t3 = t2.convert('elite' , to_binary)
#t3.display(10) #Checking

In [16]:
#build the friends_count column by join (users and friends)

source = 'Pittsburgh_full_friend_text.json'
t4 = etl.fromjson(source)
t5 = t3.join(t4, # right table
                   lkey='user_id',rkey='user_id', #join equality columns
                   rprefix='t4_') # prefixes of columns from each table (not mandatory)

def friend_count(text):
    if text[0]=='[' and text[-1]==']':
        items =  text[1:-1].split(', ') # turn values to list
        if text[3:7]== 'None':
            return 0
        return len(items)

t6 = t5.convert('t4_friends' , friend_count)
#t6.display(10) #Checking

In [17]:
# adding new user ID and rename the table

t7 = t6.addrownumbers()
t_users = t7.rename({'row':'user_id','user_id':'yelp_user_id','elite':'is_elite','t4_friends':'friends_count', 'review_count':'user_review_count'})
#t_users.display(10) #Checking

In [18]:
#Validate the fields type - for inner testing
#fields = t_users.fieldnames()
#for f in fields:
#    print f,'\t', t_users.typecounter(f)

#### Load

In [19]:
#Connect to DB
try:
    con = mdb.connect(
                   host = '127.0.0.1', user = 'root', passwd = 'root', use_unicode=True, charset="utf8")
except:
    con = mdb.connect(
                   host = '127.0.0.1', user = 'root', passwd = '1234', use_unicode=True, charset="utf8")
# setting a cursor
cur = con.cursor()     # get the cursor

In [20]:
# append the data to existing tables
cur.execute('SET SQL_MODE=ANSI_QUOTES')
t_users.appenddb(cur,'dim_users',schema='yelp_pittsburgh',commit=True)

  r = r + self.execute(query, a)


In [21]:
#Closing the Cursor
cur.close()

## Business

### Extract

In [22]:
# Open and display the bussiness json file.
filename= 'yelp_academic_dataset_business_PA.json'
t1 = etl.fromjson(filename)
#t1.display(10) #Checking

In [23]:
#Check the types of the data - for inner testing
#fields = t1.fieldnames()
#for f in fields:
#    print f,'\t', t1.typecounter(f)

### Business Catagories
We wish to reduce the number of categories in Yelp to 12 main categories. In order to be able to get insights from those 12 categories, we will implement them as 12 Binary columns in our Business Dimention.


In [24]:
#Extract csv file- for mapping the categories
csv_filename = "category.csv"
map_categories_table = etl.fromcsv(csv_filename)
#map_categories_table.display(5) #Checking

In [25]:
#Create dictionary to map all categories to 10 main 
categories_dic = {}
for row in map_categories_table:
    categories_dic[row[0]] = row[1]

### Transform

In [26]:
#Order the fields (and display)
t3 = t1.cut(['business_id','name','is_open','neighborhood','latitude','longitude','stars','review_count','categories'])
#t3.display(1)

#### order and split the categories

In [27]:
#Create the function for the convertion that reduces the categories.
def reduce_categories(text):
    if text[0]=='[' and text[-1]==']':
        items =  text[1:-1].split(', ') # turn values to list
    else:
        return None
    output = {}
    for i in items:
        key = categories_dic.get(i[2:-1])
        value = 1
        output[key]=int(value)
    return output

#Inner Test
#sample = t3[1][8]
#print reduce_categories(sample)
#print type(reduce_categories(sample))

In [28]:
# Convert the data to reduce the categories
t4 = t3.convert('categories',reduce_categories)
t4.typecounter('categories')
#unpack the dictionary in category column to 12 seperate columns
t5 = t4.unpackdict('categories')
#t5.display(1) #Checking

In [29]:
# Order fileds and change category fields name from number to the real name
t6 = etl.rename(t5, {'1': 'food','2': 'art & enteraitment','3': 'stores','4': 'beauty & spa','5': 'health','6': 'finance','7': 'turists','8': 'cars & transportation', '9': 'bars & alcohol','10': 'other','11': 'fashion','12': 'real estate'})
t7 = t6.cut(['business_id','name','is_open','neighborhood','latitude','longitude','stars','review_count', 'food', 'art & enteraitment', 'stores', 'beauty & spa', 'health', 'finance', 'turists', 'cars & transportation', 'bars & alcohol', 'other', 'fashion', 'real estate'])
#t7.display(10) #Checking

In [30]:
# change None to 0 in the categories fields
def None_to_0(text):
    if text == None:
        return 0
    else:
        return text
t8 = etl.convert(t7, ('food', 'art & enteraitment', 'stores', 'beauty & spa', 'health', 'finance', 'turists', 'cars & transportation', 'bars & alcohol', 'other', 'fashion', 'real estate'), None_to_0)
#t8.display(10) #Checking

In [31]:
# add ID
t9 = t8.addrownumbers()
t_business = t9.rename({'row':'business_id', 'business_id':'yelp_business_id', 'name':'business_name', 'stars':'business_stars', 'review_count':'business_review_count'})
t_business.display()

business_id,yelp_business_id,business_name,is_open,neighborhood,latitude,longitude,business_stars,business_review_count,food,art & enteraitment,stores,beauty & spa,health,finance,turists,cars & transportation,bars & alcohol,other,fashion,real estate
1,cnGIivYRLxpF7tBVR_JwWA,Plush Salon and Spa,1,,40.4445439533,-80.1745398943,4.0,4,0,0,1,1,0,0,0,1,0,0,0,0
2,P3LisOj7DktgGa7C5FYpnA,Benjamin Franklin Plumbing,1,West View,40.5341627,-80.0498873,4.0,8,0,0,0,0,0,0,0,0,0,1,0,0
3,93otbGHE0s0m-lU1osvg9w,Rivertowne,1,North Side,40.4459861,-80.0108802,3.0,102,1,1,0,0,0,0,0,0,1,0,0,0
4,csdZolWIWvkIHIqsCmV6sg,City Vista,1,Greentree,40.4263863165,-80.037872133,2.0,5,0,1,0,0,0,0,1,0,0,1,0,1
5,kdjrQ2tuY4eqo4JZWIx50Q,The Scarehouse,1,Etna,40.499285,-79.944321,3.5,38,0,1,0,0,0,0,0,0,0,0,0,0


### Load

In [32]:
# Check field type - for inner testing
#fields = t_business.fieldnames()
#for f in fields:
#    print f,'\t', t_business.typecounter(f)

In [33]:
# setting a new cursor
cur = con.cursor()

In [34]:
cur.execute('USE yelp_Pittsburgh')
cur.execute('SET SQL_MODE=ANSI_QUOTES')
# append the data to existing tables
t_business.appenddb(cur,'dim_business',schema='yelp_pittsburgh', commit=True)

  r = r + self.execute(query, a)
  r = r + self.execute(query, a)


In [35]:
#Closing the Cursor - Do we need to close and open it every time?
cur.close()

## Date

In [36]:
# setting a cursor
cur = con.cursor()     # get the cursor

The dates dimention table will be creating with a stored procedure.

In [37]:
cur.execute("SET @currdate := '2005-01-01';")
cur.execute("SET @enddate := '2018-01-01';")
cur.execute("DROP PROCEDURE IF EXISTS BuildDate")
 
cur.execute(""" CREATE PROCEDURE BuildDate()
         BEGIN
        WHILE @currdate < @enddate DO
            INSERT INTO dim_dates  (date, day, month, year, day_name)
            VALUES (  @currdate,  DAY(@currdate),  MONTH(@currdate), 
              YEAR(@currdate), DAYNAME(@currdate));
            SET @currdate := DATE_ADD(@currdate, INTERVAL 1 DAY);
END WHILE;
    END  """)
 
cur.execute('CALL BuildDate();')
con.commit()


  app.launch_new_instance()


### Add Holidays

In [38]:
# New Year
cur.execute("""
update dim_dates 
set holiday = 'New Year''s Day'
where day = 01 and month = 01;
""")

# Halloween
cur.execute("""
update dim_dates 
set holiday = 'Halloween'
where day = 31 and month = 10;
""")

# Independence Day
cur.execute("""
update dim_dates 
set holiday = 'Independence Day, U.S'
where day = 04 and month = 07;
""")

# Memorial Day
cur.execute("""
update dim_dates 
set holiday = 'Memorial Day'
where date = '2005-05-30' or date = '2006-05-29' or date = '2007-05-28' or date = '2008-05-26' or date = '2009-05-25'
or date = '2010-05-31' or date = '2011-05-30' or date = '2012-05-28' or date = '2013-05-27' or date = '2014-05-26'
or date = '2015-05-25' or date = '2016-05-30' or date = '2017-05-29';
""")

# Thanksgiving
cur.execute("""
update dim_dates 
set holiday = 'Thanksgiving Day'
where date = '2005-11-23' or date = '2006-11-22' or date = '2007-11-28' or date = '2008-11-27' or date = '2009-11-25'
or date = '2010-11-24' or date = '2011-11-23' or date = '2012-11-22' or date = '2013-11-27' or date = '2014-11-26'
or date = '2015-11-25' or date = '2016-11-24' or date = '2017-11-23';
""")

# Christmas Day
cur.execute("""
update dim_dates 
set holiday = 'Christmas Day'
where day = 25 and month = 12;
""")

# Christmas Eve
cur.execute("""
update dim_dates 
set holiday = 'Christmas Eve'
where day = 24 and month = 12;
""")

# Black Friday
cur.execute("""
update dim_dates 
set holiday = 'Black Friday'
where day_name = 'Friday' and month = 11 and day between 21 and 29;
""")


16L

In [39]:
con.commit()
cur.close()

## Reviews

### Extract

In [40]:
filename= 'yelp_academic_dataset_review_drop_PA.json'

In [41]:
t1 = etl.fromjson(filename)

## Check field types - inner testing 
#fields = t1.fieldnames()

#for f in fields:
#    print f,'\t', t1.typecounter(f)

Due to the current types of data, we need to do the following:
 1. Add ascending numbers to the rows and call it "review_id" (this will be used for the auto-increamented review_ID field).
 2. Aggragate the columns `funny`, `cool` and `useful` to the "votes_aggregate" column.
 3. Change the user_id, business_id, date_id to their Foreign keys (in the DB). This will be done during the Load.


In [42]:
t2 = t1.addrownumbers()
#t2.display(10) #Checking

In [43]:
t5 = t2.rename({'row':'review_id','review_id':'yelp_review_id',  'user_id':'yelp_user_id', 'business_id':'yelp_business_id', 'stars':'review_stars'})
#t5.display(10) #Checking

In [44]:
def votes_aggregate(row):
    return row['funny']+row['cool']+row['useful']

t6=t5.addfield('votes_aggregate', votes_aggregate)
#t6.display(10) #Checking

In [45]:
t7 = t6.cut(['review_id','yelp_review_id', 'yelp_user_id', 'yelp_business_id', 'date', 'review_stars', 'votes_aggregate' ])
#t7.display(10)#Checking

#### add the Foreign Keys from the different dimentions table
Note: We are doing this with PETL, to reduce the runtime.

In [46]:
# Add the foriegn key - User ID 
t8 = t7.leftjoin(t_users.cut('user_id','yelp_user_id'),
                 lkey='yelp_user_id',rkey='yelp_user_id').cutout('yelp_user_id')

#t8.display(10) #Checking

In [47]:
# Add the foriegn key - Business ID
t9 = t8.leftjoin(t_business.cut('business_id','yelp_business_id'),
            lkey='yelp_business_id', rkey = 'yelp_business_id').cutout('yelp_business_id')

#t9.display(10) #Checking

In order to get the date ID, we use the table that was loaded to the DB.

In [48]:
t_dates = etl.fromdb(con, '''SELECT date_id, DATE_FORMAT(dim_dates.date, '%Y-%m-%d') AS date FROM dim_dates;''')
#t_dates.display(10) #Checking

In [49]:
# Add the foriegn key - Date ID
t10 = t9.leftjoin(t_dates,
            lkey='date', rkey = 'date').cutout('date')

#t10.display(10) #Checking

In [50]:
# setting a cursor
cur = con.cursor()

cur.execute('USE yelp_Pittsburgh')
cur.execute('SET SQL_MODE=ANSI_QUOTES')

# append the data to existing tables - loading by "chunks"
start = 0
end = 20000
row_num = etl.nrows(t10)
while end < row_num:
    t10.rowslice(start, end).appenddb(cur,'fact_reviews',schema='yelp_pittsburgh', commit=True)
    start = end
    end += 20000

    #Add the last chunk of data
t10.rowslice(start, row_num).appenddb(cur,'fact_reviews',schema='yelp_pittsburgh', commit=True)

  r = r + self.execute(query, a)


In [51]:
con.commit()
cur.close()