In [21]:
import pandas as pd
from sodapy import Socrata
import pyspark
from pyspark.sql.functions import concat_ws
from sqlalchemy import create_engine

class checkout_titles:
    def __init__(self, limit=1000, datasource="data.seattle.gov", ID="tmmm-ytt6", results_df=None):
        '''Default to call the Seattle Public Library Checkouts
        dataset, but it can call other databases via socrata.'''
        self.limit = limit
        self.datasource = datasource
        self.ID = ID
        self.results_df= results_df
        
    def load_subset(self, limit, query=False):
        '''Uses the sodapy Socrata as an API to access the 
            Seattle Public Library checkouts dataset.'''
        client = Socrata(self.datasource, None)
        if query is not False:
            try:
                results = client.get(self.ID, query=query)
            except HTTPError:
                print('For queries, terms, such as select, groupby, or where,',
                      'need to be lowercase. Strings need to be in quotes',
                      'e.g. "EBOOK" and numbers need to not have quotes.')
            self.results_df = pd.DataFrame.from_records(results)
        else:
            results = client.get("tmmm-ytt6", limit=limit)
            self.results_df = pd.DataFrame.from_records(results)
        return self.results_df.head()
    
    def save_subset(self, filepath, local=False, update=False):
        '''Saves the subset of data loaded from the Seattle Public Library Checkouts dataset as csv.'''
        ''''Create a database connection to a SQLite database. 
    Note that the db file needs to first be created to use this function.'''
        if local:
            self.results_df.to_csv(filepath)
            if update:
                self.update_df.to_csv(filepath)
        else:                
            engine = create_engine('sqlite:///SeattleLibraryCheckouts.db')
            if update:
                self.update_df.to_sql(filepath, con = engine, if_exists='append', index=False)
            else:
                self.results_df.to_sql(filepath, con = engine, if_exists='fail', index=False)
        
    def update_dataset(self, filepaths=[]):
        '''This function concatenates a previous subset or subsets of data into one dataframe 
        called update_df. The filepaths variable is a list of CSV filepaths that the user wants concatenated.
        Each CSV file is then loaded into a pandas dataframe, and those dataframes are then combined on the row.
        
        Note filepaths must be a list with at least 2 elements for the function to work.'''
        
        df_list = [pd.read_csv(file) for file in filepaths]
        self.update_df = pd.concat(df_list)
        return self.update_df.head()
    
    def sampling(self, csv_file, fract, stratified=False):
        spark = pyspark.sql.SparkSession.builder.getOrCreate()
        chkouts = spark.read.csv(csv_file, header='true', inferSchema='true', sep=',')
        if stratified:
            #stratified sampling
            subset = chkouts.sampleByKey(False, fractions=fract)
        else:
            #random sampling
            subset = chkouts.sample(fraction=fract)
        return subset.show(5)

In [22]:
pl = checkout_titles()

In [23]:
csv_file = 'C:/Users/louisa/Documents/GitHub/checkout_my_book/Checkouts_by_Title.csv'

In [24]:
pl.sampling(csv_file, 0.2)

+----------+------------+------------+------------+-------------+---------+--------------------+------------------+--------------------+--------------------+---------------+
|UsageClass|CheckoutType|MaterialType|CheckoutYear|CheckoutMonth|Checkouts|               Title|           Creator|            Subjects|           Publisher|PublicationYear|
+----------+------------+------------+------------+-------------+---------+--------------------+------------------+--------------------+--------------------+---------------+
|  Physical|     Horizon|   SOUNDDISC|        2009|            7|       12|Funk box [sound r...|              null|Funk Music, Popul...|Hip-O Records : D...|         p2000.|
|  Physical|     Horizon|        BOOK|        2009|            7|        1|Say yes / Audrey ...|Couloumbis, Audrey|Stepmothers Juven...|           Putnam's,|         c2002.|
|  Physical|     Horizon|        BOOK|        2009|            7|        7|Math to the max 1...|              null|Mathematics Pro

In [9]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()
chkouts = spark.read.csv(csv_file, header='true', inferSchema='true', sep=',')

In [10]:
subset = chkouts.sample(fraction=0.1)

In [11]:
subset.count()

3845200

In [None]:
subset = chkouts.sample(fraction=0.2)
results_df = subset.toPandas()

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:54093)
Chained exceptions have been truncated to avoid stack overflow in traceback formatting:
Traceback (most recent call last):
  File "C:\Users\louisa\anaconda3\envs\metis\lib\site-packages\IPython\core\interactiveshell.py", line 2061, in showtraceback
    stb = value._render_traceback_()
AttributeError: 'Py4JJavaError' object has no attribute '_render_traceback_'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\louisa\anaconda3\envs\metis\lib\site-packages\py4j\java_gateway.py", line 1200, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Users\louisa\anaconda3\envs\metis\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, a

In [3]:
pl.load_subset(limit=1000, query='select * where checkoutyear=2019 and checkoutmonth < 13 limit 5000000')



Unnamed: 0,usageclass,checkouttype,materialtype,checkoutyear,checkoutmonth,checkouts,title,creator,subjects,publisher,publicationyear
0,Digital,OverDrive,EBOOK,2019,1,1,Dog Diaries #6: Sweetie,Kate Klimo,"Juvenile Fiction, Juvenile Literature","Random House, Inc.",2015
1,Digital,OverDrive,EBOOK,2019,2,1,Delicious,Lori Foster,"Fiction, Literature, Romance, Short Stories","Random House, Inc.",2017
2,Digital,OverDrive,EBOOK,2019,2,2,A Journal of the Plague Year,Daniel Defoe,"Fiction, Literature",ePenguin,2011
3,Digital,OverDrive,EBOOK,2019,3,2,The Cocoa Conspiracy,Andrea Penrose,"Fiction, Mystery","Penguin Group (USA), Inc.",2013
4,Physical,Horizon,VIDEODISC,2019,3,2,India / Pilot Film and TV Productions.,,India Description and travel,"Escapi,",[2003]


In [5]:
pl.save_subset('chkouts2019')