## Exploring Dask Framework & Client

Building high performance data processing pipeline for sampling.  Code might be useful for later implementations, exploration

In [None]:
!pip install --upgrade pip
!pip install python-decouple pandas pyarrow

### Start Dask Client for Dashboard

**Source** https://examples.dask.org/dataframe.html

In [1]:
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=2, memory_limit='2GB')

In [2]:
# Show client stats (dashboard)
client

0,1
Client  Scheduler: tcp://127.0.0.1:44829  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 8.00 GB


### Download S3 Data to dask dataframe

In [3]:
###Connect to S3###
from awstools import s3

def get_bucket(bucket_name = 'yelp-data-shared-labs18'):
    return s3.Bucket(bucket_name)

In [4]:
import dask
import dask.dataframe as dd
import pandas as pd
import os

def load_json(filename, npartitions=4):
    """
    Download json file and load into dask dataframe.
    
    Parameters
    ----------
    
    """
    filepath = os.path.join(os.getcwd(), filename)
    try:
        dask_df = dd.read_json(filepath)
        return dask_df
    except:
        raise
    

In [None]:
# Manually download file
bucket = get_bucket()
filename='yelp_json/photo.json'
bucket.get(filename, 'photo.json')

In [5]:
df = load_json(filename='business.json')

In [6]:
display(type(df), df.head())

dask.dataframe.core.DataFrame

Unnamed: 0,business_id,name,address,city,state,postal_code,latitude,longitude,stars,review_count,is_open,attributes,categories,hours
0,1SWheh84yJXfytovILXOAQ,Arizona Biltmore Golf Club,2818 E Camino Acequia Drive,Phoenix,AZ,85016,33.522143,-112.018481,3.0,5,0,{'GoodForKids': 'False'},"Golf, Active Life",
1,QXAEGFB4oINsVuTFxEYKFQ,Emerald Chinese Restaurant,30 Eglinton Avenue W,Mississauga,ON,L5R 3E7,43.605499,-79.652289,2.5,128,1,"{'RestaurantsReservations': 'True', 'GoodForMe...","Specialty Food, Restaurants, Dim Sum, Imported...","{'Monday': '9:0-0:0', 'Tuesday': '9:0-0:0', 'W..."
2,gnKjwL_1w79qoiV3IC_xQQ,Musashi Japanese Restaurant,"10110 Johnston Rd, Ste 15",Charlotte,NC,28210,35.092564,-80.859132,4.0,170,1,"{'GoodForKids': 'True', 'NoiseLevel': 'u'avera...","Sushi Bars, Restaurants, Japanese","{'Monday': '17:30-21:30', 'Wednesday': '17:30-..."
3,xvX2CttrVhyG2z1dFg_0xw,Farmers Insurance - Paul Lorenz,"15655 W Roosevelt St, Ste 237",Goodyear,AZ,85338,33.455613,-112.395596,5.0,3,1,,"Insurance, Financial Services","{'Monday': '8:0-17:0', 'Tuesday': '8:0-17:0', ..."
4,HhyxOkGAM07SRYtlQ4wMFQ,Queen City Plumbing,"4209 Stuart Andrew Blvd, Ste F",Charlotte,NC,28217,35.190012,-80.887223,4.0,4,1,"{'BusinessAcceptsBitcoin': 'False', 'ByAppoint...","Plumbing, Shopping, Local Services, Home Servi...","{'Monday': '7:0-23:0', 'Tuesday': '7:0-23:0', ..."


In [None]:
###Get/Transcribe File Data###
file_list = [
    ('business.json', 132),
    ('user.json', 2300),
    ('review.json', 5000),
    ('photo.json', 25),
    ('checkin.json', 390),
    ('tip.json', 234),
]


# Set max file size for output
max_file_size = 20  # MB


# Calculate fraction of dataset to capture
fraction_list = [(x[0], max_file_size/x[1]) for x in file_list]


print(fraction_list)

So far, Pandas will work as a drop in Dask.  I think staying with Dask is a good idea temporarily in the event we want to sample with computation (future feature generation via submit(apply_func, dataframe).

## Generate Sample Data

Create sampling procedure to get subset of data.  Early version will be simple random sample, but some time series analysis may be useful for future samples.

In [7]:
def sample_data(dataframe, fraction, filename=None):
    """
    Return sampled dataframe or save as parquet file.
    """
    if filename is None:
        return dataframe.sample(frac=fraction, replace=False).compute()
    dataframe.sample(frac=fraction, replace=False).compute().to_parquet(
        filename
    )

In [8]:
sdf = sample_data(df, 0.2)
sdf.head()

Unnamed: 0,business_id,name,address,city,state,postal_code,latitude,longitude,stars,review_count,is_open,attributes,categories,hours
191897,5vn454t2dY3qMEBjQuUkLQ,Union Orthotics & Prosthetics,3424 Liberty Ave,Pittsburgh,PA,15201,40.461434,-79.964885,3.5,4,1,"{'BusinessAcceptsCreditCards': 'True', 'ByAppo...","Pediatricians, Doctors, Health & Medical, Orth...",
139946,x1KYj6p88WAcBxNlj8Ky7A,Pizza Hut,8860 N 43rd Ave,Glendale,AZ,85302,33.565843,-112.152178,2.5,15,1,"{'OutdoorSeating': 'False', 'RestaurantsGoodFo...","Italian, Restaurants, Chicken Wings, Pizza",
152628,gbjIwB_r6y4YhaZMwEK6mw,Brick Street Bakery,27 Trinity Street,Toronto,ON,M5A 3C4,43.650574,-79.35948,4.0,116,1,"{'GoodForKids': 'True', 'OutdoorSeating': 'Tru...","Sandwiches, Restaurants, Food, Bakeries","{'Monday': '8:30-18:0', 'Tuesday': '8:30-19:0'..."
185376,zPfSGU3hzNPNV8twXBiayA,Best Impressions Caterers,2600 Youngblood St,Charlotte,NC,28203,35.204082,-80.868594,4.0,7,1,,"Caterers, Restaurants, Event Planning & Services","{'Monday': '8:30-17:30', 'Tuesday': '8:30-17:3..."
140466,4dvfZ23X1ryVNbZRbeNq-A,All Paws Pet Grooming,5701 State Rd,Cleveland,OH,44134,41.40715,-81.709902,4.0,4,1,,"Pet Sitting, Pets, Pet Services, Pet Groomers","{'Monday': '8:0-0:0', 'Tuesday': '8:0-0:0', 'W..."


In [11]:
sdf.to_parquet('sample_business.parquet')

NotImplementedError: struct<AcceptsInsurance: string, AgesAllowed: string, Alcohol: string, Ambience: string, BYOB: string, BYOBCorkage: string, BestNights: string, BikeParking: string, BusinessAcceptsBitcoin: string, BusinessAcceptsCreditCards: string, BusinessParking: string, ByAppointmentOnly: string, Caters: string, CoatCheck: string, Corkage: string, DietaryRestrictions: string, DogsAllowed: string, DriveThru: string, GoodForDancing: string, GoodForKids: string, GoodForMeal: string, HairSpecializesIn: string, HappyHour: string, HasTV: string, Music: string, NoiseLevel: string, Open24Hours: string, OutdoorSeating: string, RestaurantsAttire: string, RestaurantsCounterService: string, RestaurantsDelivery: string, RestaurantsGoodForGroups: string, RestaurantsPriceRange2: string, RestaurantsReservations: string, RestaurantsTableService: string, RestaurantsTakeOut: string, Smoking: string, WheelchairAccessible: string, WiFi: string>

**daskdataframe.sample()** returns a pandas DataFrame!

In [None]:
sample_data(df, 0.2, 'sample_business.json')

### Read from parquet (into pandas, now that we have a smaller file)

In [None]:
import pandas as pd

sample_data = pd.read_json('sample_business.json')
sample_data.head()

### List Bucket Contents

Emulate directory structure from s3 flat storage. 

In [None]:
for key in s3.get_bucket_keys(bucket_name = 'yelp-data-shared-labs18'):
    print(key)

In [None]:
bucket = s3.Bucket(bucket_name='yelp-data-shared-labs18')
bucket.contents

In [None]:
bucket.dir(all=False)

In [None]:
for parq in bucket.find(suffix="parquet"):
    print(parq)