# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [2]:
# Do all imports and installs here
import pandas as pd
import pyspark.sql.functions as f
from IPython.display import display, HTML
data_path="./data/yelp"

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

In this project we will be building analytics database using [Yelp dataset](https://www.yelp.com/dataset). This analytics table can be used to answer questions like: 
- How different discount, customer relationship programs, etc on reviews. So basically data analyst should be able to run a query and see how the reviews score changes during program time vs other times
- Have ability to fitler out negative(less than average review score for given business)reviews and find top used words/phrases
- Find top users that provided most value to the business using reviews/tips

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

Dataset contains a number of of newline delimeted json files.

#### Dataset description 
From [the dataset description](https://www.yelp.com/dataset/documentation/main)

Each file is composed of a single object type, one JSON-object per-line.

Take a look at some examples to get you started: https://github.com/Yelp/dataset-examples.

Note: the follow examples contain inline comments, which are technically not valid JSON. This is done here to simplify the documentation and explaining the structure, the JSON files you download will not contain any comments and will be fully valid JSON.

Sources:

- business.json - Contains business data including location data, attributes, and categories.

```json 
{
    // string, 22 character unique string business id
    "business_id": "tnhfDv5Il8EaGSXZGiuQGg",

    // string, the business's name
    "name": "Garaje",

    // string, the full address of the business
    "address": "475 3rd St",

    // string, the city
    "city": "San Francisco",

    // string, 2 character state code, if applicable
    "state": "CA",

    // string, the postal code
    "postal code": "94107",

    // float, latitude
    "latitude": 37.7817529521,

    // float, longitude
    "longitude": -122.39612197,

    // float, star rating, rounded to half-stars
    "stars": 4.5,

    // integer, number of reviews
    "review_count": 1198,

    //TODO: convert to boolean

    // integer, 0 or 1 for closed or open, respectively
    "is_open": 1,

    //TODO: do we need this?
    
    // object, business attributes to values. note: some attribute values might be objects
    "attributes": {
        "RestaurantsTakeOut": true,
        "BusinessParking": {
            "garage": false,
            "street": true,
            "validated": false,
            "lot": false,
            "valet": false
        },
    },

    // an array of strings of business categories
    "categories": [
        "Mexican",
        "Burgers",
        "Gastropubs"
    ],

    // an object of key day to value hours, hours are using a 24hr clock
    "hours": {
        "Monday": "10:00-21:00",
        "Tuesday": "10:00-21:00",
        "Friday": "10:00-21:00",
        "Wednesday": "10:00-21:00",
        "Thursday": "10:00-21:00",
        "Sunday": "11:00-18:00",
        "Saturday": "10:00-21:00"
    }
}
```
- review.json - Contains full review text data including the user_id that wrote the review and the business_id the review is written for.
```json
{
    // string, 22 character unique review id
    "review_id": "zdSx_SD6obEhz9VrW9uAWA",

    // string, 22 character unique user id, maps to the user in user.json
    "user_id": "Ha3iJu77CxlrFm-vQRs_8g",

    // string, 22 character business id, maps to business in business.json
    "business_id": "tnhfDv5Il8EaGSXZGiuQGg",

    // integer, star rating
    "stars": 4,

    // string, date formatted YYYY-MM-DD
    "date": "2016-03-09",

    // string, the review itself
    "text": "Great place to hang out after work: the prices are decent, and the ambience is fun. It's a bit loud, but very lively. The staff is friendly, and the food is good. They have a good selection of drinks.",

    //TODO: check min max for next values
    
    // integer, number of useful votes received
    "useful": 0,

    // integer, number of funny votes received
    "funny": 0,

    // integer, number of cool votes received
    "cool": 0
}
```
- user.json - User data including the user's friend mapping and all the metadata associated with the user.
```json
{
    // string, 22 character unique user id, maps to the user in user.json
    "user_id": "Ha3iJu77CxlrFm-vQRs_8g",

    // string, the user's first name
    "name": "Sebastien",

    // integer, the number of reviews they've written
    "review_count": 56,

    // string, when the user joined Yelp, formatted like YYYY-MM-DD
    "yelping_since": "2011-01-01",

    // array of strings, an array of the user's friend as user_ids
    "friends": [
        "wqoXYLWmpkEH0YvTmHBsJQ",
        "KUXLLiJGrjtSsapmxmpvTA",
        "6e9rJKQC3n0RSKyHLViL-Q"
    ],

    // integer, number of useful votes sent by the user
    "useful": 21,

    // integer, number of funny votes sent by the user
    "funny": 88,

    // integer, number of cool votes sent by the user
    "cool": 15,

    // integer, number of fans the user has
    "fans": 1032,

    // array of integers, the years the user was elite
    "elite": [
        2012,
        2013
    ],

    //TODO: do we need this?
    
    // float, average rating of all reviews
    "average_stars": 4.31,

    // integer, number of hot compliments received by the user
    "compliment_hot": 339,

    // integer, number of more compliments received by the user
    "compliment_more": 668,

    // integer, number of profile compliments received by the user
    "compliment_profile": 42,

    // integer, number of cute compliments received by the user
    "compliment_cute": 62,

    // integer, number of list compliments received by the user
    "compliment_list": 37,

    // integer, number of note compliments received by the user
    "compliment_note": 356,

    // integer, number of plain compliments received by the user
    "compliment_plain": 68,

    // integer, number of cool compliments received by the user
    "compliment_cool": 91,

    // integer, number of funny compliments received by the user
    "compliment_funny": 99,

    // integer, number of writer compliments received by the user
    "compliment_writer": 95,

    // integer, number of photo compliments received by the user
    "compliment_photos": 50
}
```

- checkin.json - Checkins on a business.
```json
{
    // string, 22 character business id, maps to business in business.json
    "business_id": "tnhfDv5Il8EaGSXZGiuQGg"

    // string which is a comma-separated list of timestamps for each checkin, each with format YYYY-MM-DD HH:MM:SS
    "date": "2016-04-26 19:49:16, 2016-08-30 18:36:57, 2016-10-15 02:45:18, 2016-11-18 01:54:50, 2017-04-20 18:39:06, 2017-05-03 17:58:02"
}
```
- tip.json - Tips written by a user on a business. Tips are shorter than reviews and tend to convey quick suggestions.
```json
{
    // string, text of the tip
    "text": "Secret menu - fried chicken sando is da bombbbbbb Their zapatos are good too.",

    // string, when the tip was written, formatted like YYYY-MM-DD
    "date": "2013-09-20",

    // integer, how many compliments it has
    "compliment_count": 172,

    // string, 22 character business id, maps to business in business.json
    "business_id": "tnhfDv5Il8EaGSXZGiuQGg",

    // string, 22 character unique user id, maps to the user in user.json
    "user_id": "49JhAJh8vSQ-vM4Aourl0g"
}
```

In [2]:
# Read in the data here

In [3]:
from pyspark.sql import SparkSession

%time
spark = SparkSession.builder.master("local[8]").enableHiveSupport().getOrCreate()

prev = spark.conf.get("spark.sql.execution.arrow.enabled")  # Keep its default value.
import warnings
warnings.filterwarnings("ignore")  # Ignore warnings coming from Arrow optimizations.

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 4.53 µs


21/12/05 20:41:58 WARN Utils: Your hostname, babkamen-Lenovo resolves to a loopback address: 127.0.1.1; using 192.168.0.133 instead (on interface wlp3s0)
21/12/05 20:41:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/05 20:41:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
def info(df):
    display(df.info())
    
    print("Null values count:")
    print()
    c=df.isnull().sum()
    print(c[c>0])
#     TODO: add duplicated values count
    print()
    print("Statistics:")
    display(df.describe(include='all'))
    
    print("Head:")
    display(df.head())

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.


#### Businesses

In [5]:


%time
df=spark.read.json(f"{data_path}/yelp_academic_dataset_business.json")
df.createOrReplaceTempView("businesses")
df.cache()
info(df.toPandas())
# kdf=df.toPandas()

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 5.25 µs


21/12/05 20:42:08 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 160585 entries, 0 to 160584
Data columns (total 14 columns):
 #   Column        Non-Null Count   Dtype  
---  ------        --------------   -----  
 0   address       160585 non-null  object 
 1   attributes    145593 non-null  object 
 2   business_id   160585 non-null  object 
 3   categories    160470 non-null  object 
 4   city          160585 non-null  object 
 5   hours         133244 non-null  object 
 6   is_open       160585 non-null  int64  
 7   latitude      160585 non-null  float64
 8   longitude     160585 non-null  float64
 9   name          160585 non-null  object 
 10  postal_code   160585 non-null  object 
 11  review_count  160585 non-null  int64  
 12  stars         160585 non-null  float64
 13  state         160585 non-null  object 
dtypes: float64(3), int64(2), object(9)
memory usage: 17.2+ MB


None

Null values count:

attributes    14992
categories      115
hours         27341
dtype: int64

Statistics:


Unnamed: 0,address,attributes,business_id,categories,city,hours,is_open,latitude,longitude,name,postal_code,review_count,stars,state
count,160585.0,145593,160585,160470,160585,133244,160585.0,160585.0,160585.0,160585,160585.0,160585.0,160585.0,160585
unique,123895.0,67907,160585,88115,836,50857,,,,125850,5779.0,,,31
top,,"(None, None, None, None, None, None, None, Non...",6iYb2HFDywm3zjuRg0shjw,"Beauty & Spas, Hair Salons",Austin,"(0:0-0:0, 0:0-0:0, 0:0-0:0, 0:0-0:0, 0:0-0:0, ...",,,,Starbucks,78704.0,,,MA
freq,6726.0,9316,1,757,22416,5708,,,,852,2084.0,,,36012
mean,,,,,,,0.767494,38.759794,-94.266212,,,51.964548,3.656954,
std,,,,,,,0.422431,7.138042,19.975446,,,130.030448,0.943604,
min,,,,,,,0.0,27.998972,-123.393929,,,5.0,1.0,
25%,,,,,,,1.0,30.355886,-122.589583,,,8.0,3.0,
50%,,,,,,,1.0,42.177366,-84.383281,,,17.0,4.0,
75%,,,,,,,1.0,45.458531,-81.288501,,,44.0,4.5,


Head:


Unnamed: 0,address,attributes,business_id,categories,city,hours,is_open,latitude,longitude,name,postal_code,review_count,stars,state
0,921 Pearl St,"(None, None, 'beer_and_wine', {'touristy': Fal...",6iYb2HFDywm3zjuRg0shjw,"Gastropubs, Food, Beer Gardens, Restaurants, B...",Boulder,"(11:0-23:0, 11:0-23:0, 11:0-23:0, 11:0-23:0, 1...",1,40.017544,-105.283348,Oskar Blues Taproom,80302,86,4.0,CO
1,7000 NE Airport Way,"(None, None, u'beer_and_wine', {'romantic': Fa...",tCbdrRPZA0oiIYSmHG3J0w,"Salad, Soup, Sandwiches, Delis, Restaurants, C...",Portland,"(5:0-18:0, 5:0-18:0, 5:0-18:0, 5:0-18:0, 5:0-1...",1,45.588906,-122.593331,Flying Elephants at PDX,97218,126,4.0,OR
2,4720 Hawthorne Ave,"(None, None, None, None, None, None, None, Fal...",bvN78flM8NLprQ1a1y5dRg,"Antiques, Fashion, Used, Vintage & Consignment...",Portland,"(11:0-18:0, None, 11:0-18:0, 11:0-18:0, 11:0-1...",1,45.511907,-122.613693,The Reclaimory,97214,13,4.5,OR
3,2566 Enterprise Rd,"(None, None, None, None, None, None, None, Non...",oaepsyvc0J17qwi8cfrOWg,"Beauty & Spas, Hair Salons",Orange City,,1,28.914482,-81.295979,Great Clips,32763,8,3.0,FL
4,1046 Memorial Dr SE,"(None, None, None, None, None, None, None, Non...",PE9uqAjdw0E4-8mjGl3wVA,"Gyms, Active Life, Interval Training Gyms, Fit...",Atlanta,"(16:0-19:0, 16:0-19:0, 9:0-11:0, None, 16:0-19...",1,33.747027,-84.353424,Crossfit Terminus,30316,14,4.0,GA


#### Reviews


In [6]:
df=spark.read.json(f"{data_path}/yelp_academic_dataset_review.json")
df.createOrReplaceTempView("reviews")

                                                                                

In [7]:
# https://stackoverflow.com/questions/44413132/count-the-number-of-missing-values-in-a-dataframe-spark
from pyspark.sql.functions import col,sum
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).toPandas()


                                                                                

Unnamed: 0,business_id,cool,date,funny,review_id,stars,text,useful,user_id
0,0,0,0,0,0,0,0,0,0


#### Count empty values 

In [8]:
#https://towardsdatascience.com/data-prep-with-spark-dataframes-3629478a1041
df.select([f.count(f.when(f.isnan(c), c)).alias(c) for c in df.columns]).toPandas()

                                                                                

Unnamed: 0,business_id,cool,date,funny,review_id,stars,text,useful,user_id
0,0,0,0,0,0,0,0,0,0


#### Count nulls

In [9]:
df.select([f.count(f.when(f.col(c).isNull(), c)).alias(c) for c in 
           df.columns]).toPandas().T

                                                                                

Unnamed: 0,0
business_id,0
cool,0
date,0
funny,0
review_id,0
stars,0
text,0
useful,0
user_id,0


In [10]:
# df.filter(f.isnan("")).toPandas()

In [11]:
"{:,}".format(df.count())

                                                                                

'8,635,403'

In [12]:
spark.sql("SELECT * FROM reviews WHERE text='' LIMIT 10").toPandas()

                                                                                

Unnamed: 0,business_id,cool,date,funny,review_id,stars,text,useful,user_id


In [13]:
df.limit(5).toPandas().head()

Unnamed: 0,business_id,cool,date,funny,review_id,stars,text,useful,user_id
0,buF9druCkbuXLX526sGELQ,1,2014-10-11 03:34:02,1,lWC-xP3rd6obsecCYsGZRg,4.0,Apparently Prides Osteria had a rough summer a...,3,ak0TdVmGKo4pwqdJSTLwWw
1,RA4V8pr014UyUbDvI-LW2A,0,2015-07-03 20:38:25,0,8bFej1QE5LXp4O05qjGqXA,4.0,This store is pretty good. Not as great as Wal...,1,YoVfDbnISlW0f7abNQACIg
2,_sS2LBIGNT5NQb6PD1Vtjw,0,2013-05-28 20:38:06,0,NDhkzczKjLshODbqDoNLSg,5.0,I called WVM on the recommendation of a couple...,0,eC5evKn1TWDyHCyQAwguUw
3,0AzLzHfOJgL7ROwhdww2ew,1,2010-01-08 02:29:15,1,T5fAqjjFooT4V0OeZyuk1w,2.0,I've stayed at many Marriott and Renaissance M...,1,SFQ1jcnGguO0LYWnbbftAA
4,8zehGz9jnxPqXtOc7KaJxA,0,2011-07-28 18:05:01,0,sjm_uUcQVxab_EeLCqsYLg,4.0,The food is always great here. The service fro...,0,0kA0PAJ8QFMeveQWHFqz2A


#### Users

In [14]:
%%time
df=spark.read.json(f"{data_path}/yelp_academic_dataset_user.json")
users=df



CPU times: user 15 ms, sys: 224 µs, total: 15.2 ms
Wall time: 8.59 s




In [15]:
#https://stackoverflow.com/questions/3154460/python-human-readable-large-numbers

"{:,}".format(df.count())

                                                                                

'2,189,457'

In [16]:
df.summary().toPandas()

                                                                                

Unnamed: 0,summary,average_stars,compliment_cool,compliment_cute,compliment_funny,compliment_hot,compliment_list,compliment_more,compliment_note,compliment_photos,...,cool,elite,fans,friends,funny,name,review_count,useful,user_id,yelping_since
0,count,2189457.0,2189457.0,2189457.0,2189457.0,2189457.0,2189457.0,2189457.0,2189457.0,2189457.0,...,2189457.0,2189457.0,2189457.0,2189457,2189457.0,2189457,2189457.0,2189457.0,2189457,2189457
1,mean,3.653816110569928,2.5026232531627706,0.1304570037228408,2.5026232531627706,1.633913340156943,0.0612740967280928,0.2736518689337128,1.2354332603928735,0.9881007025942962,...,20.473540699817352,2014.6331140977984,1.3792186829885218,,15.39467959407287,,21.697721398502004,38.05667295589728,,
2,stddev,1.1538609330757066,83.63695759997829,10.767452899622612,83.63695759997829,64.40826658665287,9.473195831748187,11.99887384468362,39.82064066188779,87.43188645190922,...,466.82963889748737,3.651799459305662,16.866749723244446,,353.269747281276,,76.01254770183907,535.2625345401981,,
3,min,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,,0.0,"---A_S9GsLdfLSURLx6-Dw, jT8iJYsTY8-aD91zSlxfMg...",0.0,Patrizia,0.0,0.0,---2PmXbF47D870stH1jqA,2004-10-12 08:46:11
4,25%,3.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,2012.0,0.0,,0.0,98.0,2.0,1.0,,
5,50%,3.88,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,2016.0,0.0,,0.0,,5.0,3.0,,
6,75%,4.55,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,3.0,2018.0,0.0,,3.0,,15.0,13.0,,
7,max,5.0,46858.0,13654.0,46858.0,25304.0,12669.0,13501.0,38322.0,82630.0,...,198451.0,20192020.0,12116.0,"zzzcuxFaP_FvdIB-fbP9iA, wgyzGQ9LM8oedgve16uU5g...",172041.0,ｼﾞｪﾚﾐｰ,15686.0,204380.0,zzzqnB-6DlYUbqAPxUxg4A,2021-01-28 15:32:55


In [17]:
df.limit(5).toPandas().head()

Unnamed: 0,average_stars,compliment_cool,compliment_cute,compliment_funny,compliment_hot,compliment_list,compliment_more,compliment_note,compliment_photos,compliment_plain,...,cool,elite,fans,friends,funny,name,review_count,useful,user_id,yelping_since
0,3.85,2541,361,2541,1710,147,163,1212,323,5691,...,11291,200620072008200920102011201220132014,1357,"xBDpTUbai0DXrvxCe3X16Q, 7GPNBO496aecrjJfW6UWtg...",10030,Jane,1220,15038,q_QQ5kBBwlCcbL1s4NVK3g,2005-03-14 20:26:35
1,4.09,2205,232,2205,1632,96,87,1187,294,3293,...,18046,"2007,2008,2009,2010,2011,2012,2013,2014,2015,2...",1025,"XPzYf9_mwG2eXYP2BAGSTA, 2LooM5dcIk2o01nftYdPIg...",10289,Gabi,2136,21272,dIIKEfOgo0KqUfGQvGikPg,2007-08-10 19:01:51
2,3.76,31,0,31,22,0,1,5,1,20,...,130,20102011,16,"GfB6sC4NJQvSI2ewbQrDNA, jhZtzZNNZJOU2YSZ6jPlXQ...",128,Jason,119,188,D6ErcUnFALnCQN4b1W_TlA,2007-02-07 15:47:53
3,3.77,1566,219,1566,1180,90,129,1120,326,4510,...,4035,200920102011201220132014,420,"HQZPQhKMwRAyS6BCselVWQ, kP2U1s_sjQfHO9grxiyDTA...",4722,Kat,987,7234,JnPIjvC0cmooNDfsa9BmXg,2009-02-09 16:14:29
4,3.72,310,16,310,248,15,19,77,44,131,...,1124,200920102011,47,"-Q88pZUcrfN0BLBDp-bkAQ, etPn4Pv1Gc4cRZjRgB_BOw...",727,Christine,495,1577,37Hc8hr3cw0iHLoPzLK6Ow,2008-03-03 04:57:05


#### Check ins

In [5]:
import pandas as pd

df = pd.read_json(f"{data_path}/yelp_academic_dataset_checkin.json", lines=True)

In [7]:
df.date.str.len().max()

3150103

#### Cleaning Steps
Document steps necessary to clean the data

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [18]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [19]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

# Export to postgres

In [None]:
import glob
for path in glob.glob(f"{data_path}/*.json"):
    print(path)
    print(path[path.rindex("/")+1:path.rindex(".")])

In [1]:
from pyspark.sql import SparkSession
# spark.stop()

data_path="./data/yelp"
spark = SparkSession.builder.master("local[7]").getOrCreate()


print(spark.sparkContext.getConf().getAll())
mode = "overwrite"
url = "jdbc:postgresql://127.0.0.1:5432/dwh"
properties = {"user": "airflow","password": "airflow","driver": "org.postgresql.Driver"}

# for path in glob.glob(f"{data_path}/*.json"):
#     tbl=path[path.rindex("/")+1:path.rindex(".")]
#     spark.read.json(path).write.jdbc(url=url, table=tbl, mode=mode, properties=properties)



df = spark.read.json(f"{data_path}/yelp_academic_dataset_checkin.json")


spark.read.json(f"{data_path}/yelp_academic_dataset_user.json").write.jdbc(url=url, table="users", mode=mode, properties=properties)

spark.read.json(f"{data_path}/yelp_academic_dataset_review.json").write.jdbc(url=url, table="reviews", mode=mode, properties=properties)

df= spark.read.json(f"{data_path}/yelp_academic_dataset_business.json").drop("attributes")
df.printSchema()
df.drop('hours').write.jdbc(url=url, table="businesses", mode=mode, properties=properties)



21/12/12 21:59:12 WARN Utils: Your hostname, babkamen-Lenovo resolves to a loopback address: 127.0.1.1; using 192.168.0.133 instead (on interface wlp3s0)
21/12/12 21:59:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/12 21:59:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[('spark.driver.port', '34593'), ('spark.master', 'local[7]'), ('spark.rdd.compress', 'True'), ('spark.serializer.objectStreamReset', '100'), ('spark.app.startTime', '1639339153260'), ('spark.driver.host', '192.168.0.133'), ('spark.submit.pyFiles', ''), ('spark.executor.id', 'driver'), ('spark.submit.deployMode', 'client'), ('spark.sql.warehouse.dir', 'file:/home/babkamen/git/udacity-big-data-engineer-nanodegree/capstone-project/spark-warehouse'), ('spark.ui.showConsoleProgress', 'true'), ('spark.app.name', 'pyspark-shell'), ('spark.app.id', 'local-1639339154426')]




Py4JJavaError: An error occurred while calling o81.jdbc.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 24 in stage 3.0 failed 1 times, most recent failure: Lost task 24.0 in stage 3.0 (TID 132) (192.168.0.133 executor driver): java.sql.BatchUpdateException: Batch entry 512 INSERT INTO reviews ("business_id","cool","date","funny","review_id","stars","text","useful","user_id") VALUES ('qCyDatIL1MNeSo12WJEZjg',3,'2007-11-13 22:39:38',5,'T2vUdvUZ_dY76S4xpmxz6A',5.0,'I''ve recently become the Laurelhurst''s biggest fan. Pleased to meet you.

For years have I shirked going to the pub theaters. They seemed cheesy and gimmicky...I''m judgmental and hardheaded, I suppose.  
It was foolishness! Beer and movies?! In the same building? I was living a total lie.
The Laurelhurst is the jam.  Yes, beer and movies.  Also...they''re good movies, you guys. Movies I actually want to see. Sorry, I''m not much of a heavy-handed-overwrought-blockbuster gal. Old and new. For young and old. So nice!
There''s also pizza. I really like pizza.

And it costs me $3.  I can totally and completely afford that.

I watched Purple Rain here. I like that. Period!',2,'h7DOfAaP3b_6lp__qVxSiA') was aborted: ERROR: could not extend file "base/25088/443761.2": wrote only 4096 of 8192 bytes at block 324405
  Hint: Check free disk space.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2097)
	at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1454)
	at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1479)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:544)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:881)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:904)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1634)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:723)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:890)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1020)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1020)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.postgresql.util.PSQLException: ERROR: could not extend file "base/25088/443761.2": wrote only 4096 of 8192 bytes at block 324405
  Hint: Check free disk space.
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2674)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2364)
	... 21 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1020)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1018)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:888)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:81)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:745)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.sql.BatchUpdateException: Batch entry 512 INSERT INTO reviews ("business_id","cool","date","funny","review_id","stars","text","useful","user_id") VALUES ('qCyDatIL1MNeSo12WJEZjg',3,'2007-11-13 22:39:38',5,'T2vUdvUZ_dY76S4xpmxz6A',5.0,'I''ve recently become the Laurelhurst''s biggest fan. Pleased to meet you.

For years have I shirked going to the pub theaters. They seemed cheesy and gimmicky...I''m judgmental and hardheaded, I suppose.  
It was foolishness! Beer and movies?! In the same building? I was living a total lie.
The Laurelhurst is the jam.  Yes, beer and movies.  Also...they''re good movies, you guys. Movies I actually want to see. Sorry, I''m not much of a heavy-handed-overwrought-blockbuster gal. Old and new. For young and old. So nice!
There''s also pizza. I really like pizza.

And it costs me $3.  I can totally and completely afford that.

I watched Purple Rain here. I like that. Period!',2,'h7DOfAaP3b_6lp__qVxSiA') was aborted: ERROR: could not extend file "base/25088/443761.2": wrote only 4096 of 8192 bytes at block 324405
  Hint: Check free disk space.  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2097)
	at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1454)
	at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1479)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:544)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:881)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:904)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1634)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:723)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:890)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1020)
	at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1020)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: org.postgresql.util.PSQLException: ERROR: could not extend file "base/25088/443761.2": wrote only 4096 of 8192 bytes at block 324405
  Hint: Check free disk space.
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2674)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2364)
	... 21 more


In [2]:
from pyspark.sql import SparkSession
# spark.stop()

data_path="./data/yelp"
spark = SparkSession.builder.master("local[7]").getOrCreate()


print(spark.sparkContext.getConf().getAll())
mode = "overwrite"
url = "jdbc:postgresql://127.0.0.1:5432/dwh"
properties = {"user": "airflow","password": "airflow","driver": "org.postgresql.Driver"}

# for path in glob.glob(f"{data_path}/*.json"):
#     tbl=path[path.rindex("/")+1:path.rindex(".")]
#     spark.read.json(path).write.jdbc(url=url, table=tbl, mode=mode, properties=properties)



df = spark.read.json(f"{data_path}/yelp_academic_dataset_checkin.json")
from pyspark.sql.functions import split,explode, ltrim
# df.show()
df2= df.selectExpr("business_id", "explode(split(date,',')) as date") .selectExpr("business_id", "ltrim(date) as date")
df2.show(20, False) 
df2.coalesce(1).write.format('json').mode("overwrite").save('./check_ins')

21/12/15 21:15:34 WARN Utils: Your hostname, babkamen-Lenovo resolves to a loopback address: 127.0.1.1; using 192.168.0.133 instead (on interface wlp3s0)
21/12/15 21:15:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/15 21:15:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[('spark.app.startTime', '1639595735187'), ('spark.master', 'local[7]'), ('spark.rdd.compress', 'True'), ('spark.app.id', 'local-1639595736239'), ('spark.serializer.objectStreamReset', '100'), ('spark.driver.host', '192.168.0.133'), ('spark.submit.pyFiles', ''), ('spark.executor.id', 'driver'), ('spark.submit.deployMode', 'client'), ('spark.sql.warehouse.dir', 'file:/home/babkamen/git/udacity-big-data-engineer-nanodegree/capstone-project/spark-warehouse'), ('spark.ui.showConsoleProgress', 'true'), ('spark.app.name', 'pyspark-shell'), ('spark.driver.port', '42539')]


                                                                                

In [2]:
from notebook.services.config import ConfigManager
cm = ConfigManager().update('notebook', {'limit_output': 50000})

In [6]:
from pyspark.sql import SparkSession
# spark.stop()
from pyspark.sql.functions import col
data_path="./data/yelp"
spark = SparkSession.builder.master("local[7]").getOrCreate()



df = spark.read.json(f"{data_path}/yelp_academic_dataset_user.json")

# from pyspark.sql.functions import split,explode, ltrim
# # df.show()
df_users=df.selectExpr("user_id").alias("u")
df_friends= df.filter(df.friends!="None").selectExpr("user_id", "explode(split(friends,',')) as friend_id") .selectExpr("user_id", "ltrim(friend_id) as friend_id").alias("f")
df_friends = df_friends.join(df_users, col("f.friend_id") == col("u.user_id")).drop("u.user_id")
df_friends.createOrReplaceTempView("users")
spark.sql("SELECT COUNT(*) FROM users").show()
df_friends.show()

# df2.show(20, False) 
# df2.coalesce(1).write.format('json').mode("overwrite").save('./friends')

                                                                                

+--------+
|count(1)|
+--------+
|17971548|
+--------+



[Stage 40:>                                                         (0 + 1) / 1]

+--------------------+--------------------+--------------------+
|             user_id|           friend_id|             user_id|
+--------------------+--------------------+--------------------+
|Wo_ASbM3pzJ4r1Qf6...|--WdohYHcU0CtA6gB...|--WdohYHcU0CtA6gB...|
|fyQ_PV1z7TsUTOUKA...|--WdohYHcU0CtA6gB...|--WdohYHcU0CtA6gB...|
|0j-MgFIeN4PAW0Zab...|--WdohYHcU0CtA6gB...|--WdohYHcU0CtA6gB...|
|LgqaNO1mXwWncSfmS...|--WdohYHcU0CtA6gB...|--WdohYHcU0CtA6gB...|
|JJukAvHwwV_tVycod...|--WdohYHcU0CtA6gB...|--WdohYHcU0CtA6gB...|
|jsAyVCvJJm6m7FNPR...|--WdohYHcU0CtA6gB...|--WdohYHcU0CtA6gB...|
|ag7t0ohT-T5rkVkGf...|--WdohYHcU0CtA6gB...|--WdohYHcU0CtA6gB...|
|bMlZPO4ifuwSR-clu...|--WdohYHcU0CtA6gB...|--WdohYHcU0CtA6gB...|
|xqJrrN26uzjbMgPip...|--WdohYHcU0CtA6gB...|--WdohYHcU0CtA6gB...|
|wpr6gw4FnptTrk1Ce...|--WdohYHcU0CtA6gB...|--WdohYHcU0CtA6gB...|
|-XZOz3ViFET3IZFRG...|--WdohYHcU0CtA6gB...|--WdohYHcU0CtA6gB...|
|Ad3n5RVoy1uYuzwB2...|--z9XJZF0T2r7aIsZ...|--z9XJZF0T2r7aIsZ...|
|es9BFjQvNnhCfRR5F...|-0G

                                                                                

In [None]:
spark.stop()