In [1]:
!pip install pyarrow
!pip install fastparquet
!pip install --upgrade panda
!pip install --upgrade dask
!pip install pyspark



In [1]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.csv as pv
import fastparquet
import dask.dataframe as dd
import csv
import shutil
from pathlib import Path
from pandas.io import parsers
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import functions as F
from google.cloud import storage

In [2]:
cwd = Path.cwd()
dataset = 'yelp_dataset'

In [3]:
def convert_json_to_csv(json_file_path):
    # Read the JSON file in chunks, to avoid MemoryError
    # lines=True for the json file with one object per line
    chunks = pd.read_json(json_file_path, lines=True, chunksize=100000)
    filename = json_file_path.stem + '.csv'
    for i, chunk in enumerate(chunks): # Found some unusual rows on reviews, 
        chunk.dropna().to_csv(filename,
            header=(i==0), mode='a', index=True, index_label='index')

In [4]:
def append_if_exist(dataset_name):
    cwd = Path.cwd()
    json_name = dataset_name + '.json'
    csv_name = dataset_name + '.csv'
    json_file_path_user = cwd.joinpath('yelp_dataset', json_name)
    if cwd.joinpath(csv_name).is_file()==False:
        convert_json_to_csv(json_file_path_user)
    else:
        # Avoid duplicate appending to the same file
        cwd.joinpath(csv_name).unlink() # Deletes existing file, replace with updated one
        convert_json_to_csv(json_file_path_user)

In [5]:
files = ['yelp_academic_dataset_user', 'yelp_academic_dataset_tip', 'yelp_academic_dataset_review', 'yelp_academic_dataset_checkin', 'yelp_academic_dataset_business']
for dataset in files:
    append_if_exist(dataset)

Convert to parquet

In [15]:
append_if_exist('yelp_academic_dataset_review')

Testing Zone

TypeError: 'list' object is not callable

In [6]:
import time

start = time.time()
df2 = pd.read_csv('yelp_academic_dataset_review.csv', on_bad_lines='warn')
df2 = df2.dropna()
df2['index'] = pd.to_numeric(df2['index'])
df2.to_parquet('yelp_academic_dataset_review.parquet', compression='gzip')
end = time.time()
print(end-start)


  df2 = pd.read_csv('yelp_academic_dataset_review.csv', on_bad_lines='warn')


275.9857189655304


In [61]:
df2[2923301:2923306]

Unnamed: 0.1,Unnamed: 0,review_id,user_id,business_id,stars,useful,funny,cool,text,date
2923301,2923301,Gmd5aXXHFBMTaiDt7rf-Sg,rPN6v1sNLgoLq7sJXZMvUA,owrGcj5a7W0QnWA7HwJ2Ag,2.0,6.0,3.0,4.0,UPDATE: this store is no longer open 24/7.\n\n...,2015-06-27 03:01:22
2923302,2923302,VVNJLuayIj6SbhtOY3XIcA,DbNKK25oOzfHxyfBHlOaDg,-dwqsjGDBnzzv2qQYyIC3g,5.0,1.0,0.0,0.0,I noticed this place on the way to work one mo...,
2923303,We were welcomed at the door. The staff here w...,,,,,,,,,
2923304,There were so many great menu choices and a lo...,2016-03-21 19:31:55,,,,,,,,
2923305,2923303,P7g9LAj9XJlWSyP5KFqY0Q,ZQ-hVBie3ls9u951cJkgSQ,3nuDjIVfclE99gf4KBidKw,3.0,0.0,0.0,0.0,It's a Tampa thing...\n\nI was at Westshore Pl...,2018-01-12 22:14:25


In [18]:

import time

start = time.time()
sc.stop()
sc =SparkContext()
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Transform Spark Session") \
    .getOrCreate()

df = spark.read.options(header=True,  escape='"', quotes='"', sep=',', inferSchema=True, limit=5).csv('yelp_academic_dataset_review.csv', multiLine=True)
df.write.parquet('yelp_academic_dataset_review.parquet', compression="gzip")
end = time.time()
print(end-start)


23/01/01 16:23:36 WARN SparkContext: Another SparkContext is being constructed (or threw an exception in its constructor). This may indicate an error, since only one SparkContext should be running in this JVM (see SPARK-2243). The other SparkContext was created at:
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:67)
java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:484)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.ja

IllegalArgumentException: None

+--------------------+
|                text|
+--------------------+
|Even though there...|
|Please note: the ...|
|Is it okay to say...|
|Great staff.  Not...|
|Nice view from ou...|
|Is there any way ...|
|Updating previous...|
|This hotel has wo...|
|Love this place! ...|
|Large menu. Clean...|
|Delicious crab co...|
|Freddy's has the ...|
|This place had fa...|
|I really had a wo...|
|Allegro's is grea...|
|This place treats...|
|Two very good exp...|
|One of their gues...|
|IF YOU ARE ON A S...|
|I took an 8 year ...|
+--------------------+
only showing top 20 rows



In [101]:
ddf2 = dd.from_pandas(df2, npartitions=10)

In [102]:
ddf2.to_parquet('yelp_academic_dataset_review.parquet', compression='gzip')

In [8]:
df2['Unnamed: 0'] = pd.to_numeric(df2['Unnamed: 0'])

In [103]:
df2.to_parquet('yelp_academic_dataset_review.parquet', compression='gzip', single_file = True)

TypeError: __cinit__() got an unexpected keyword argument 'single_file'

In [5]:
def spark_process_csv(name):
    csv_name = name + '.csv'
    spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("Transform Spark Session") \
        .config("spark.jars", "/opt/homebrew/Cellar/apache-spark/3.3.1/libexec/jars/gcs-connector-hadoop2-latest.jar") \
        .getOrCreate()
    spark._jsc.hadoopConfiguration().set('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
    df = spark.read.options(header=True,  escape='"', quotes='"', sep=',', inferSchema=True, limit=5).csv(csv_name, multiLine=True)
    df.write.parquet('gs://dana-staging/', compression="gzip")

In [6]:
def process_if_exists(name):
    csv_name = name + '.csv'
    parquet_name = name + '.parquet'
    if cwd.joinpath(csv_name).is_file()==False:
        return '.csv file missing'
    elif cwd.joinpath(parquet_name).is_dir()==True:
        shutil.rmtree(cwd.joinpath(parquet_name))
        spark_process_csv(name)
    elif cwd.joinpath(parquet_name).is_dir()==False:
        spark_process_csv(name)

In [7]:
sc.stop()
conf = SparkConf()
conf.set("spark.jars", "/opt/homebrew/Cellar/apache-spark/3.3.1/libexec/jars/gcs-connector-hadoop2-latest.jar")
conf.set('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
conf.set("spark.hadoop.google.cloud.auth.service.account.enable", "true")
conf.set("google.cloud.auth.service.account.json.keyfile", "alpine-furnace-373410-63fc9174bc11.json")
conf.set("spark.hadoop.fs.gs.project.id", "alpine-furnace-373410")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
sc = SparkContext(conf=conf)

process_if_exists('yelp_academic_dataset_review')

NameError: name 'sc' is not defined

<pyspark.conf.SparkConf at 0x28c902dc0>

In [28]:
def conversion_to_parquet(name):
    csv_name = name + '.csv'
    table = pv.read_csv(csv_name)
    pq.write_table(table, name +'.parquet')

In [22]:
# for csv in files:
#     conversion_to_parquet(csv)

conversion_to_parquet('yelp_academic_dataset_review')

ArrowInvalid: CSV parse error: Expected 10 columns, got 8: The main reason I leave this bad review is because of the banquet staff. Our waitress was very r ...