In [41]:
import pyspark
import pyspark.sql.functions as F
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.ml.stat import Correlation

#### Requêtes SQL 

In [42]:
print(pyspark.__version__)

3.2.1


In [43]:
conf = pyspark.SparkConf().set("spark.jars.packages", 
                                "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1").setMaster("local").setAppName("My App").setAll([("spark.driver.memory", "40g"), ("spark;executor.memory", "50g")])

In [44]:
sc = SparkContext(conf=conf) #Connected to pyspark

In [45]:
sqlC = SQLContext(sc)



In [46]:
mongo_ip = "mongodb://localhost:27017/restaurantsdb."

In [47]:
restaurants = sqlC.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", mongo_ip + "restaurants").load()

In [48]:
restaurants.createOrReplaceTempView("restaurants")

In [49]:
restaurants = sqlC.sql("SELECT * FROM restaurants")

In [50]:
restaurants.show()

+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+-------------+
|                 _id|             address|      borough|             cuisine|              grades|                name|restaurant_id|
+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+-------------+
|{61f15a93e0cb0c13...|{1007, [-73.85607...|        Bronx|              Bakery|[{2014-03-03 01:0...|Morris Park Bake ...|     30075445|
|{61f15a93e0cb0c13...|{469, [-73.961704...|     Brooklyn|          Hamburgers|[{2014-12-30 01:0...|             Wendy'S|     30112340|
|{61f15a93e0cb0c13...|{351, [-73.985135...|    Manhattan|               Irish|[{2014-09-06 02:0...|Dj Reynolds Pub A...|     30191841|
|{61f15a93e0cb0c13...|{2780, [-73.98241...|     Brooklyn|           American |[{2014-06-10 02:0...|     Riviera Caterer|     40356018|
|{61f15a93e0cb0c13...|{97-22, [-73.8601...|       Queen

In [51]:
# Nombre de données dans la table
restaurants = sqlC.sql("SELECT COUNT(*) FROM restaurants")
restaurants.show()

+--------+
|count(1)|
+--------+
|    3772|
+--------+



In [52]:
# Nombre de restaurants par code postal
restaurants = sqlC.sql("SELECT address.zipcode, COUNT(restaurant_id) FROM restaurants GROUP BY address.zipcode")
restaurants.show()

+-------+--------------------+
|zipcode|count(restaurant_id)|
+-------+--------------------+
|  11205|                  10|
|  11236|                  11|
|  10309|                  13|
|  11106|                  27|
|  11218|                  16|
|  10452|                  11|
|  11428|                   5|
|  11237|                   9|
|  11379|                  11|
|  11364|                  10|
|  11249|                   8|
|  10012|                  94|
|  11001|                   2|
|  11385|                  34|
|  11238|                  12|
|  10039|                   4|
|  11427|                   5|
|  11367|                  10|
|  10010|                  27|
|  10038|                  18|
+-------+--------------------+
only showing top 20 rows



In [53]:
# Nombre de restaurants par type de cuisine
restaurants = sqlC.sql("SELECT cuisine, COUNT(restaurant_id) FROM restaurants GROUP BY cuisine")
restaurants.show()

+----------------+--------------------+
|         cuisine|count(restaurant_id)|
+----------------+--------------------+
|Pancakes/Waffles|                   7|
|Chinese/Japanese|                   1|
|         Mexican|                  73|
|   Jewish/Kosher|                  60|
|          Bakery|                 127|
|         Turkish|                  11|
|        Armenian|                   1|
|         Hotdogs|                   4|
|       Ethiopian|                   3|
|            Thai|                  14|
|          Indian|                  43|
|         Chinese|                 115|
|      Indonesian|                   2|
|       Soul Food|                   6|
|     Continental|                   8|
|           Steak|                  21|
|         African|                   4|
|CafÃ©/Coffee/Tea|                   1|
|          Donuts|                  43|
|           Tapas|                   4|
+----------------+--------------------+
only showing top 20 rows



In [54]:
# Le restaurant le mieux noté
from pyspark.sql.functions import *

restaurants = sqlC.sql("select *,aggregate(grades.score,0,(x,y) -> x + y) as sum from restaurants")
restaurants.show()

+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+-------------+---+
|                 _id|             address|      borough|             cuisine|              grades|                name|restaurant_id|sum|
+--------------------+--------------------+-------------+--------------------+--------------------+--------------------+-------------+---+
|{61f15a93e0cb0c13...|{1007, [-73.85607...|        Bronx|              Bakery|[{2014-03-03 01:0...|Morris Park Bake ...|     30075445| 41|
|{61f15a93e0cb0c13...|{469, [-73.961704...|     Brooklyn|          Hamburgers|[{2014-12-30 01:0...|             Wendy'S|     30112340| 55|
|{61f15a93e0cb0c13...|{351, [-73.985135...|    Manhattan|               Irish|[{2014-09-06 02:0...|Dj Reynolds Pub A...|     30191841| 37|
|{61f15a93e0cb0c13...|{2780, [-73.98241...|     Brooklyn|           American |[{2014-06-10 02:0...|     Riviera Caterer|     40356018| 36|
|{61f15a93e0cb0c13...|{97-2

### Preprocessing

In [55]:
restaurants_df = sqlC.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", mongo_ip + "restau_df").load()

In [56]:
restaurants_df.show()

+--------------------+--------------------+-----------------+------------------+-----+-------------+-------+
|                 _id|             cuisine|        lattitude|         longitude|notes|     quartier|zipcode|
+--------------------+--------------------+-----------------+------------------+-----+-------------+-------+
|{61f15a93e0cb0c13...|              Bakery|        40.848447|        -73.856077|    2|        Bronx|  10462|
|{61f15a93e0cb0c13...|          Hamburgers|        40.662942|        -73.961704|    8|     Brooklyn|  11225|
|{61f15a93e0cb0c13...|               Irish|       40.7676919|-73.98513559999999|    2|    Manhattan|  10019|
|{61f15a93e0cb0c13...|           American |        40.579505|-73.98241999999999|    5|     Brooklyn|  11224|
|{61f15a93e0cb0c13...|       Jewish/Kosher|       40.7311739|       -73.8601152|   20|       Queens|  11374|
|{61f15a93e0cb0c13...|           American |       40.7643124|       -73.8803827|   38|       Queens|  11369|
|{61f15a93e0cb0c13.

In [57]:
restaurants_df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- cuisine: string (nullable = true)
 |-- lattitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- notes: integer (nullable = true)
 |-- quartier: string (nullable = true)
 |-- zipcode: string (nullable = true)



In [58]:
restaurants_df.describe().show()

+-------+--------------------+------------------+------------------+------------------+-------------+-----------------+
|summary|             cuisine|         lattitude|         longitude|             notes|     quartier|          zipcode|
+-------+--------------------+------------------+------------------+------------------+-------------+-----------------+
|  count|                3772|              3772|              3772|              3772|         3772|             3772|
|   mean|                null| 40.67625120771491|-73.96846774331894|10.701219512195122|         null|10545.26802757158|
| stddev|                null|1.6256337301241979| 4.953221264543527|  5.98845956338147|         null| 588.991762883543|
|    min|              Afghan|       -28.0168595|      -119.6368672|                -1|        Bronx|            10001|
|    max|Vietnamese/Cambod...|        51.6514664|       153.1628795|                89|Staten Island|            11697|
+-------+--------------------+----------

In [59]:
import pymongo
import pandas as pd
from pymongo import MongoClient

#MongoDB collection to Pandas dataframe
c = MongoClient()
db = c.restaurantsdb
input_data = db.restau_df
df = pd.DataFrame(list(input_data.find()))
df

Unnamed: 0,_id,longitude,lattitude,zipcode,cuisine,quartier,notes
0,61f15a93e0cb0c134b208f69,-73.856077,40.848447,10462,Bakery,Bronx,2
1,61f15a93e0cb0c134b208f6a,-73.961704,40.662942,11225,Hamburgers,Brooklyn,8
2,61f15a93e0cb0c134b208f6b,-73.985136,40.767692,10019,Irish,Manhattan,2
3,61f15a93e0cb0c134b208f6c,-73.982420,40.579505,11224,American,Brooklyn,5
4,61f15a93e0cb0c134b208f6d,-73.860115,40.731174,11374,Jewish/Kosher,Queens,20
...,...,...,...,...,...,...,...
3767,61f15a94e0cb0c134b209e20,-73.986544,40.733770,10003,Café/Coffee/Tea,Manhattan,5
3768,61f15a94e0cb0c134b209e21,-73.946224,40.656571,11203,Café/Coffee/Tea,Brooklyn,3
3769,61f15a94e0cb0c134b209e22,-73.983944,40.756111,10036,Japanese,Manhattan,7
3770,61f15a94e0cb0c134b209e23,-73.986652,40.753641,10018,Sandwiches/Salads/Mixed Buffet,Manhattan,5


In [60]:
df['cuisine'].value_counts()

American            1255
Italian              325
Pizza                270
Café/Coffee/Tea      180
Hamburgers           159
                    ... 
CafÃ©/Coffee/Tea       1
Moroccan               1
Armenian               1
Chinese/Japanese       1
Salads                 1
Name: cuisine, Length: 70, dtype: int64

In [61]:
df['quartier'].value_counts()

Manhattan        1883
Queens            738
Brooklyn          684
Bronx             309
Staten Island     158
Name: quartier, dtype: int64

##### One Hot Encoding

In [62]:
# Get one hot encoding of columns cusine
cuisine = pd.get_dummies(df['cuisine'])
del df['cuisine']
df = df.join(cuisine)

# Get one hot encoding of columns quartier
quartier = pd.get_dummies(df['quartier'])
del df['quartier']
df = df.join(quartier)
df.head()

Unnamed: 0,_id,longitude,lattitude,zipcode,notes,Afghan,African,American,Armenian,Asian,...,Tex-Mex,Thai,Turkish,Vegetarian,Vietnamese/Cambodian/Malaysia,Bronx,Brooklyn,Manhattan,Queens,Staten Island
0,61f15a93e0cb0c134b208f69,-73.856077,40.848447,10462,2,0,0,0,0,0,...,0,0,0,0,0,1,0,0,0,0
1,61f15a93e0cb0c134b208f6a,-73.961704,40.662942,11225,8,0,0,0,0,0,...,0,0,0,0,0,0,1,0,0,0
2,61f15a93e0cb0c134b208f6b,-73.985136,40.767692,10019,2,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,0
3,61f15a93e0cb0c134b208f6c,-73.98242,40.579505,11224,5,0,0,1,0,0,...,0,0,0,0,0,0,1,0,0,0
4,61f15a93e0cb0c134b208f6d,-73.860115,40.731174,11374,20,0,0,0,0,0,...,0,0,0,0,0,0,0,0,1,0


In [63]:
df.shape

(3772, 80)

In [64]:
# column notes to last position
notes = df['notes']
del df['notes']
df.insert(loc=79, column='notes', value=notes)
df.head()

Unnamed: 0,_id,longitude,lattitude,zipcode,Afghan,African,American,Armenian,Asian,Bagels/Pretzels,...,Thai,Turkish,Vegetarian,Vietnamese/Cambodian/Malaysia,Bronx,Brooklyn,Manhattan,Queens,Staten Island,notes
0,61f15a93e0cb0c134b208f69,-73.856077,40.848447,10462,0,0,0,0,0,0,...,0,0,0,0,1,0,0,0,0,2
1,61f15a93e0cb0c134b208f6a,-73.961704,40.662942,11225,0,0,0,0,0,0,...,0,0,0,0,0,1,0,0,0,8
2,61f15a93e0cb0c134b208f6b,-73.985136,40.767692,10019,0,0,0,0,0,0,...,0,0,0,0,0,0,1,0,0,2
3,61f15a93e0cb0c134b208f6c,-73.98242,40.579505,11224,0,0,1,0,0,0,...,0,0,0,0,0,1,0,0,0,5
4,61f15a93e0cb0c134b208f6d,-73.860115,40.731174,11374,0,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,20


In [65]:
del df['_id']
df.head()

Unnamed: 0,longitude,lattitude,zipcode,Afghan,African,American,Armenian,Asian,Bagels/Pretzels,Bakery,...,Thai,Turkish,Vegetarian,Vietnamese/Cambodian/Malaysia,Bronx,Brooklyn,Manhattan,Queens,Staten Island,notes
0,-73.856077,40.848447,10462,0,0,0,0,0,0,1,...,0,0,0,0,1,0,0,0,0,2
1,-73.961704,40.662942,11225,0,0,0,0,0,0,0,...,0,0,0,0,0,1,0,0,0,8
2,-73.985136,40.767692,10019,0,0,0,0,0,0,0,...,0,0,0,0,0,0,1,0,0,2
3,-73.98242,40.579505,11224,0,0,1,0,0,0,0,...,0,0,0,0,0,1,0,0,0,5
4,-73.860115,40.731174,11374,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,1,0,20


In [66]:
X = df.iloc[:, :-1]
y = df.iloc[:, -1]

##### Gradient Descent

In [67]:
import numpy as np

class GradientDescentLinearRegression:
    def __init__(self, learning_rate=0.01, iterations=1000):
        self.learning_rate, self.iterations = learning_rate, iterations
    
    def fit(self, X, y):
        b = 0
        m = 5
        n = X.shape[0]
        for _ in range(self.iterations):
            b_gradient = -2 * np.sum(y - m*X + b) / n
            m_gradient = -2 * np.sum(X*(y - (m*X + b))) / n
            b = b + (self.learning_rate * b_gradient)
            m = m - (self.learning_rate * m_gradient)
        self.m, self.b = m, b
        
    def predict(self, X):
        return self.m*X + self.b

##### Regression Lasso

In [None]:
from numpy import arange
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import RepeatedKFold
from sklearn.linear_model import Lasso
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
data_train_df = (X_train, y_train)
data_test_df = (X_test, y_test)

# define model
model = Lasso()
# define model evaluation method
cv = RepeatedKFold(n_splits=10, n_repeats=3, random_state=1)
# define grid
grid = dict()
grid['alpha'] = arange(0, 1, 0.01)
# define search
search = GridSearchCV(model, grid, scoring='neg_mean_absolute_error', cv=cv, n_jobs=-1)
# perform the search
results = search.fit(X_train, y_train)
# summarize
print('MAE: %.3f' % results.best_score_)
print('Config: %s' % results.best_params_)

MAE: -3.703
Config: {'alpha': 0.02}


In [None]:
y_pred = search.predict(X_test)
y_pred

array([10.40886072, 11.08708772, 10.61631939, ..., 11.02104205,
       11.45901402, 10.61677938])

In [None]:
import numpy as np
from sklearn.metrics import mean_absolute_error

array_y_test = np.array(y_test)
# calculate errors
mae = mean_absolute_error(array_y_test, y_pred)
mae

3.736500399798486

In [None]:
from sklearn.metrics import mean_squared_error
# calculate errors
mse = mean_squared_error(array_y_test, y_pred)
mse

37.005422358443376

In [None]:
from sklearn.metrics import mean_squared_error
# calculate errors
rmse = mean_squared_error(array_y_test, y_pred, squared=False)
rmse

6.083208229087952

##### ElasticNet

In [None]:
from numpy import arange
from pandas import read_csv
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import RepeatedKFold
from sklearn.linear_model import ElasticNet

# define model
model = ElasticNet()
# define model evaluation method
cv = RepeatedKFold(n_splits=10, n_repeats=3, random_state=1)
# define grid
grid = dict()
grid['alpha'] = [1e-5, 1e-4, 1e-3, 1e-2, 1e-1, 0.0, 1.0, 10.0, 100.0]
grid['l1_ratio'] = arange(0, 1, 0.01)
# define search
search = GridSearchCV(model, grid, scoring='neg_mean_absolute_error', cv=cv, n_jobs=-1)
# perform the search
results = search.fit(X_train, y_train)
# summarize
print('MAE: %.3f' % results.best_score_)
print('Config: %s' % results.best_params_)

KeyboardInterrupt: 

In [None]:
y_pred_elasticnet = search.predict(X_test)
y_pred_elasticnet

mae_elasticnet = mean_absolute_error(array_y_test, y_pred_elasticnet)
mae_elasticnet

mse_elasticnet = mean_squared_error(array_y_test, y_pred_elasticnet)
mse_elasticnet

rmse_elasticnet = mean_squared_error(array_y_test, y_pred_elasticnet, squared=False)
rmse_elasticnet

array([10.24503887, 11.21151345, 10.5881534 , ..., 10.86751833,
       11.80045694, 10.58755735])

##### Regression Ridge

In [None]:
from numpy import arange
from pandas import read_csv
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import RepeatedKFold
from sklearn.linear_model import Ridge

# define model
model = Ridge()
# define model evaluation method
cv = RepeatedKFold(n_splits=10, n_repeats=3, random_state=1)
# define grid
grid = dict()
grid['alpha'] = arange(0, 1, 0.01)
# define search
search = GridSearchCV(model, grid, scoring='neg_mean_absolute_error', cv=cv, n_jobs=-1)
# perform the search
results = search.fit(X_train, y_train)
# summarize
print('MAE: %.3f' % results.best_score_)
print('Config: %s' % results.best_params_)

MAE: -3.775
Config: {'alpha': 0.99}


In [None]:
y_pred_ridge = search.predict(X_test)
y_pred_ridge

mae_ridge = mean_absolute_error(array_y_test, y_pred_ridge)
mae_ridge

mse_ridge = mean_squared_error(array_y_test, y_pred_ridge)
mse_ridge

rmse_ridge = mean_squared_error(array_y_test, y_pred_ridge, squared=False)
rmse_ridge

array([10.21418107, 11.32073297, 10.54695348, ..., 10.64697462,
       12.38477704, 10.5461822 ])

- Quartier et Cuisine en catégorie
- Extraire grade et score
- Extraire longitude et latitude (variable adress)
- Split train set et test set
- Regression : transformer output en classe et comparer avec le modele de classification
- Choisir metrics de regression et classification
- Arbre de décision, Random Forest, Xgboost