# Retail food store data

In [1]:
import json
import pandas as pd

# load the JSON file as a dictionary
with open('retail_food_stores.json', 'r', encoding='utf-8') as file:
    data = json.load(file)

# extract the metadata and store it in a dataframe
metadata = pd.DataFrame(data['meta'], index=[0])

metadata['temp'] = 1
# extract the data and store it in a dataframe
df = pd.DataFrame(data['data'])

df['temp'] = 1
# merge the metadata dataframe with the data dataframe
result = pd.merge(metadata, df, on='temp')

# display the resulting dataframe
print(result)

       view  temp                   0                                     1  \
0       NaN     1  row-656w.xr9g-izkp  00000000-0000-0000-830A-FB544A42623C   
1       NaN     1  row-sip5~swjf.mic4  00000000-0000-0000-1D72-DEFD8206793E   
2       NaN     1  row-h2ap-c7pr_kkgm  00000000-0000-0000-87EC-56223CC693F4   
3       NaN     1  row-ut8p_acga_u6xi  00000000-0000-0000-2B14-8A8858FE407B   
4       NaN     1  row-6ytw~jem2.4fnp  00000000-0000-0000-BDB9-7D5D5C295FDF   
...     ...   ...                 ...                                   ...   
28515   NaN     1  row-v9cd-gbc6.6u3b  00000000-0000-0000-698E-B98F201A4CB5   
28516   NaN     1  row-eizm.b2mt~yi3g  00000000-0000-0000-9CD9-EFBC6C615B05   
28517   NaN     1  row-ditr-fpqg-nayr  00000000-0000-0000-E01E-5F5CFE3BBE69   
28518   NaN     1  row-i766-7r6j-tuki  00000000-0000-0000-30C0-8088F99C64A1   
28519   NaN     1  row-uuzr~e7ps~79md  00000000-0000-0000-BD04-EC6A3A14BF2F   

       2           3     4           5     6    7  

In [2]:
column_names = list(result.columns.values)

print(column_names)

['view', 'temp', 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23]


In [3]:
# Rename columns
result.rename(columns={
    'view': 'view',
    'temp': 'temp',
    0: 'sid',
    1: 'id',
    2: 'position',
    3: 'created_at',
    4: 'created_meta',
    5:'updated_at',
    6:'updated_meta',
    7:'meta_data',
    8:'County',
}, inplace=True)

column_names = list(result.columns.values)

print(column_names)

['view', 'temp', 'sid', 'id', 'position', 'created_at', 'created_meta', 'updated_at', 'updated_meta', 'meta_data', 'County', 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23]


In [4]:
# Rename columns
result.rename(columns={
    9:'License_Number',
    10:'Operation_Type',
    11:'Establishment_Type',
    12:'Entity_Name',
    13:'DBA_Name',
    14:'Street_Number',
    15:'Street_Name',
    16:'Address_Line_2',
    17:'Address_Line_3',
    18:'City',
    19:'State',
    20:'Zip_Code',
    21:'Square_Footage',
    22:'Georeference',
    23:'NYS_Municipal_Boundaries'
}, inplace=True)

column_names = list(result.columns.values)

print(column_names)

['view', 'temp', 'sid', 'id', 'position', 'created_at', 'created_meta', 'updated_at', 'updated_meta', 'meta_data', 'County', 'License_Number', 'Operation_Type', 'Establishment_Type', 'Entity_Name', 'DBA_Name', 'Street_Number', 'Street_Name', 'Address_Line_2', 'Address_Line_3', 'City', 'State', 'Zip_Code', 'Square_Footage', 'Georeference', 'NYS_Municipal_Boundaries']


# Create cassandra session

In [36]:
from cassandra.cluster import Cluster
from cassandra.policies import DCAwareRoundRobinPolicy
from cassandra.auth import PlainTextAuthProvider

#uth_provider = PlainTextAuthProvider(username='cassandra', password='password')
#cluster = Cluster(['127.0.0.1'], load_balancing_policy=DCAwareRoundRobinPolicy(local_dc='US-WEST'), port=9042, auth_provider=auth_provider)
auth_provider = PlainTextAuthProvider(username='Test', password='Test@4321')
# Connect to the Cassandra cluster
cluster = Cluster(['127.0.0.1'], port=9042, auth_provider=auth_provider)
session = cluster.connect('dapdb')


In [6]:
delete_stmnt = session.prepare('DROP TABLE IF EXISTS dapdb.retail_food_stores')
session.execute(delete_stmnt)

<cassandra.cluster.ResultSet at 0x16e87dca670>

# Create table of Retail_Food_Stores

In [7]:
create_statement = session.prepare('''CREATE TABLE Retail_Food_Stores (id varchar PRIMARY KEY,County varchar,License_Number varchar,Operation_Type varchar,Establishment_Type varchar,Entity_Name varchar,DBA_Name varchar,Street_Number varchar,Street_Name varchar,Address_Line_2 varchar,Address_Line_3 varchar,City varchar,State varchar,Zip_Code varchar,Square_Footage varchar,Georeference varchar,NYS_Municipal_Boundaries varchar)''')

In [8]:
session.execute(create_statement)


<cassandra.cluster.ResultSet at 0x16e87e19a60>

In [9]:
insert_statement = session.prepare('''INSERT INTO retail_food_stores (id,County, License_Number, Operation_Type, Establishment_Type, Entity_Name, DBA_Name, Street_Number, Street_Name, Address_Line_2, Address_Line_3, City, State, Zip_Code, Square_Footage, Georeference, NYS_Municipal_Boundaries) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)''')

In [10]:
for _, row in result.iterrows():
    session.execute(insert_statement, [row['id'],row['County'],row['License_Number'],row['Operation_Type'],row['Establishment_Type'],row['Entity_Name'],row['DBA_Name'],row['Street_Number'],row['Street_Name'],row['Address_Line_2'],row['Address_Line_3'],row['City'],row['State'],row['Zip_Code'],row['Square_Footage'],row['Georeference'],row['NYS_Municipal_Boundaries']])

# Farmers_Markets_in_New_York_State.json

In [11]:
import json
import pandas as pd

# load the JSON file as a dictionary
with open('Farmers_Markets_in_New_York_State.json', 'r', encoding='utf-8') as file:
    farmer_data = json.load(file)

# extract the metadata and store it in a dataframe
farmer_metadata = pd.DataFrame(farmer_data['meta'], index=[0])

farmer_metadata['temp'] = 1
# extract the data and store it in a dataframe
df_farmer = pd.DataFrame(farmer_data['data'])

df_farmer['temp'] = 1
# merge the metadata dataframe with the data dataframe
farmer_result = pd.merge(farmer_metadata, df_farmer, on='temp')

# display the resulting dataframe
print(farmer_result.head())

   view  temp                   0                                     1  2  \
0   NaN     1  row-8bvy~av76~b29y  00000000-0000-0000-28E0-E06AD96EE2A7  0   
1   NaN     1  row-s6nz~k9hb_cy4t  00000000-0000-0000-A626-382A53B00DBE  0   
2   NaN     1  row-3rrp_2m92-gba2  00000000-0000-0000-3EC6-E88B89C60758  0   
3   NaN     1  row-tvpw.ird8-ib3y  00000000-0000-0000-CF90-47907C6B0A6A  0   
4   NaN     1  row-aeje_yd3e.62nv  00000000-0000-0000-F62C-A22FDBBC23B5  0   

            3     4           5     6    7  ...  \
0  1680644578  None  1680644578  None  { }  ...   
1  1680644578  None  1680644578  None  { }  ...   
2  1680644578  None  1680644578  None  { }  ...   
3  1680644578  None  1680644578  None  { }  ...   
4  1680644578  None  1680644578  None  { }  ...   

                                            18                   19     20 21  \
0  Mon-Fri 10am-5pm, Sat 10a-4p  Sun 10a-4:30p   June 1-December 23      M  Y   
1                                  Sat 9am-1pm    May 6-Decemb

In [12]:
farmer_result_column_names = list(farmer_result.columns.values)

print(farmer_result_column_names)

['view', 'temp', 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27]


In [13]:
# Rename columns
farmer_result.rename(columns={
    'view': 'view',
    'temp': 'temp',
    0: 'sid',
    1: 'id',
    2: 'position',
    3: 'created_at',
    4: 'created_meta',
    5:'updated_at',
    6:'updated_meta',
    7:'meta_data',
    8:'County',
}, inplace=True)

farmer_result_column_names = list(farmer_result.columns.values)

print(farmer_result_column_names)

['view', 'temp', 'sid', 'id', 'position', 'created_at', 'created_meta', 'updated_at', 'updated_meta', 'meta_data', 'County', 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27]


In [14]:
# Rename columns
farmer_result.rename(columns={
    9:'Market_Name',
    10:'Market_Location',
    11:'Address_Line_1',
    12:'City',
    13:'State',
    14:'Zip',
    15:'Contact',
    16:'Phone',
    17:'Market_Link',
    18:'Operation_Hours',
    19:'Operation_Season',
    20:'Operating_Months',
    21:'FMNP',
    22:'SNAP',
    23:'FCC_Issued',
    24:'FCC_Accepted',
    25:'Latitude',
    26:'Longitude',
    27:'Georeference_1'
}, inplace=True)

farmer_result_column_names = list(farmer_result.columns.values)

print(farmer_result_column_names)

['view', 'temp', 'sid', 'id', 'position', 'created_at', 'created_meta', 'updated_at', 'updated_meta', 'meta_data', 'County', 'Market_Name', 'Market_Location', 'Address_Line_1', 'City', 'State', 'Zip', 'Contact', 'Phone', 'Market_Link', 'Operation_Hours', 'Operation_Season', 'Operating_Months', 'FMNP', 'SNAP', 'FCC_Issued', 'FCC_Accepted', 'Latitude', 'Longitude', 'Georeference_1']


In [15]:
delete_stmnt = session.prepare('DROP TABLE IF EXISTS dapdb.Farmer_Market_NYC')
session.execute(delete_stmnt)

<cassandra.cluster.ResultSet at 0x18f9a1692e0>

In [16]:
create_statement = session.prepare('''CREATE TABLE Farmer_Market_NYC (id varchar PRIMARY KEY,County varchar,Market_Name varchar,Market_Location varchar,Address_Line_1 varchar,City varchar,State varchar,Zip varchar,Contact varchar,Phone varchar,Operation_Hours varchar,Operation_Season varchar,Operating_Months varchar,FMNP varchar,SNAP varchar,FCC_Issued varchar, FCC_Accepted varchar,Latitude varchar,Longitude varchar,Georeference_1 varchar)''')

In [17]:
session.execute(create_statement)

<cassandra.cluster.ResultSet at 0x18f9a1687c0>

In [18]:
insert_statement = session.prepare('''INSERT INTO Farmer_Market_NYC (id,County, Market_Name, Market_Location, Address_Line_1, City, State, Zip, Contact, Phone, Operation_Hours, Operation_Season, Operating_Months, FMNP, SNAP, FCC_Issued, FCC_Accepted, Latitude, Longitude, Georeference_1) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)''')
farmer_result.count()

view                  0
temp                405
sid                 405
id                  405
position            405
created_at          405
created_meta          0
updated_at          405
updated_meta          0
meta_data           405
County              405
Market_Name         405
Market_Location     405
Address_Line_1      405
City                405
State               405
Zip                 405
Contact             405
Phone               403
Market_Link         308
Operation_Hours     405
Operation_Season    405
Operating_Months    405
FMNP                405
SNAP                405
FCC_Issued          405
FCC_Accepted        405
Latitude            405
Longitude           405
Georeference_1      405
dtype: int64

In [19]:
for _, row in farmer_result.iterrows():
    session.execute(insert_statement, [row['id'],row['County'],row['Market_Name'],row['Market_Location'],row['Address_Line_1'],row['City'],row['State'],row['Zip'],row['Contact'],row['Phone'],row['Operation_Hours'],row['Operation_Season'],row['Operating_Months'],row['FMNP'],row['SNAP'],row['FCC_Issued'],row['FCC_Accepted'],row['Latitude'],row['Longitude'],row['Georeference_1']])

# Get data from docker to python

In [11]:
# Execute a CQL query to retrieve the contents of a table
farmer_query = "SELECT * FROM farmer_market_nyc"
rows = session.execute(farmer_query)

# Convert the results to a Pandas DataFrame
farmer_df = pd.DataFrame(rows)

In [12]:
farmer_df.head()

Unnamed: 0,id,address_line_1,city,contact,county,fcc_accepted,fcc_issued,fmnp,georeference_1,latitude,longitude,market_location,market_name,operating_months,operation_hours,operation_season,phone,snap,state,zip
0,00000000-0000-0000-0918-2C33DA1769E9,3809 Old State Road,Allegany,Gina Anderson,Cattaraugus,Y,N,Y,POINT (-78.50618 42.09601),42.09601,-78.50618,3809 Old State Rd Allegany,Canticle Farm Market,YR,"Tue/Fri 12pm-6pm, Dec-June: Tue only",Year-round,7163730200,Y,NY,14706
1,00000000-0000-0000-8FBD-628827DCE1D2,602 Craig Street,Schenectady,Marissa Peck,Schenectady,Y,N,N,POINT (-73.93741 42.80323),42.80323,-73.93741,602 Craig Street Schenectady,Veggie Mobile-Hillside View Apts,YR,Fri 2:15pm-3pm,Year-round,5182748685,Y,NY,12307
2,00000000-0000-0000-E2C9-596B13C3D504,134 Southside Ave,Hastings on Huds,Pascale Le Draoulec,Westchester,Y,N,N,POINT (-73.88424 40.9943),40.9943,-73.88424,Commuter Lot Across from Metro-North Station,Hastings WINTER Farmers' Market,X/W/P,Sat 8:30am-1:30pm,"Dec 17-May 6, 2023",9148063380,Y,NY,10706
3,00000000-0000-0000-6B08-D28B708EDDA3,512 County Route 26,West Monroe,Elaine Guppy,Oswego,Y,N,Y,POINT (-76.03094 43.35052),43.35052,-76.03094,512 County Route 26 West Monroe,Guppy's Berry Farm Farm Stand,M,Tue-Sun 9am-4pm,July 20-September 30,3156257920,N,NY,13617
4,00000000-0000-0000-5B26-A865F3E0486D,East 149th St and Morris Ave,Bronx,Siobahn Keys,Bronx,Y,N,N,POINT (-73.92276 40.8174),40.8174,-73.92276,234 E 149th St btwn Park and Morris Aves,Lincoln Hospital Fri Greenmarket,YR,Fri 8am-3pm,Year-round,2127887476,Y,NY,10451


In [13]:
farmer_df.shape

(405, 20)

In [32]:
# Execute a CQL query to retrieve the contents of a table
retail_query = "SELECT * FROM retail_food_stores"
retail_rows = session.execute(retail_query)

# Convert the results to a Pandas DataFrame
retail_df = pd.DataFrame(retail_rows)

In [33]:
retail_df.head()

Unnamed: 0,id,address_line_2,address_line_3,city,county,dba_name,entity_name,establishment_type,georeference,license_number,nys_municipal_boundaries,operation_type,square_footage,state,street_name,street_number,zip_code
0,00000000-0000-0000-F3CA-DAD1E98512DF,,,WILLIAMSON,Wayne,DOLLAR GENERAL 9109,DOLGEN NEW YORK LLC,A,POINT (-77.19454 43.23266),540140,626,Store,6100,NY,RT 104 PO BOX 181,3906,14589
1,00000000-0000-0000-EF13-E0BBC1BF03AF,,,BROOKLYN,Kings,KINGS HAMILTON AVE MOBI,BOLLA OPERATING CORP,JAC,POINT (-73.9993 40.67315),710066,894,Store,1200,NY,HAMILTON AVE,375,11231
2,00000000-0000-0000-398D-902D9D1BAA68,,,PEARL RIVER,Rockland,KP FOOD MART,LAM KAM PAN,JAC,POINT (-74.02156 41.06017),390201,530,Store,1000,NY,N MAIN ST,27,10965
3,00000000-0000-0000-E20F-768D1855F008,,,WEBSTER,Monroe,MALCHO'S 480 PLANK RD,MALCHO'S 480 PLANK ROAD LLC,JAC,POINT (-77.5073 43.18276),707722,541,Store,1500,NY,PLANK RD,480,14580
4,00000000-0000-0000-F4B2-2CC1C6AA5A4A,,,BROOKLYN,Kings,MALKO KARKANNI BROS,MALKO KARKANNI BROS INC,JABCD,POINT (-73.99367 40.69004),616603,894,Store,1800,NY,ATLANTIC AVE,174,11201


In [121]:
retail_df.shape

(28520, 17)

# Tranform Data

In [30]:
import luigi
from luigi import build
import pandas as pd

In [31]:
class CassandraTask(luigi.Task):
    keyspace = luigi.Parameter()
    table = luigi.Parameter()
    query = luigi.Parameter()
    host = luigi.Parameter(default='127.0.0.1')

    def output(self):
        return luigi.LocalTarget("C:\\Users\\ketak\\OneDrive\\Documents\\cassandra_data_farmer.csv")

    def run(self):
        #drop unused columns
        processed_df_farmer = farmer_df.drop(['city','county','georeference_1','latitude','longitude','state','zip'], axis=1)
        processed_df_farmer = processed_df_farmer.rename(columns={'id': 'ID', 'address_line_1': 'AddressLine1','contact': 'ContactName','fcc_accepted':'FCCAccepted',
                                                 'fcc_issued':'FCCIssued','fmnp':'FMNP','market_location':'MarketLocation','market_name':'MarketName','operating_months':'OperatingMonths',
                                                 'operation_hours':'OperationHours','operation_season':'OperationSeason','phone':'PhoneNumber','snap':'SNAP',})
#         # check for missing values
        print(processed_df_farmer.isna().sum())
        # Open a file object for writing
        
        # Save data to CSV file
        with self.output().open('w') as f:
            # Write the header row
            header = ','.join(processed_df_farmer.columns) + '\n'
            print(header)
            f.write(header)
            
            # Iterate over the rows of the DataFrame and write them to the CSV
            for _, row in processed_df_farmer.iterrows():
                # Replace any commas or pound signs in the data with spaces
                row = [str(val).replace(',', ' ').replace('#', ' ') for val in row]
                # Join the row values with commas and write them to the CSV
                f.write(','.join(row) + '\n')
    
#             # Loop through each row of the dataframe
#             for index, row in processed_df_farmer.iterrows():
#                 # Replace any commas or pound signs in the data with spaces
#                 row = [str(val).replace(',', ' ').replace('#', ' ') for val in row]
#                 # Join the row values with commas and write them to the CSV
#                 f.write(','.join(row) + '\n')
                
                
#                 # Convert the row data to a list of strings
#                 row_data = [str(x) for x in row.values]
#                 # Replace any commas in the row data with a different delimiter
#                 row_data = [x.replace(',', '/').replace('#', ' ') if isinstance(x, str) and ',' in x else x for x in row_data]
#                 # Write the row data to the file
#                 f.write(','.join(row_data) + '\n')

In [32]:
task = CassandraTask(keyspace='dapdb', table='farmer_market_nyc', query='SELECT * FROM farmer_market_nyc')

In [33]:
build([task], local_scheduler=True)

DEBUG: Checking if CassandraTask(keyspace=dapdb, table=farmer_market_nyc, query=SELECT * FROM farmer_market_nyc, host=127.0.0.1) is complete
INFO: Informed scheduler that task   CassandraTask_127_0_0_1_dapdb_SELECT___FROM_fa_c5305910e3   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 19392] Worker Worker(salt=6798687159, workers=1, host=Ketaki, username=ketak, pid=19392) running   CassandraTask(keyspace=dapdb, table=farmer_market_nyc, query=SELECT * FROM farmer_market_nyc, host=127.0.0.1)
INFO: [pid 19392] Worker Worker(salt=6798687159, workers=1, host=Ketaki, username=ketak, pid=19392) done      CassandraTask(keyspace=dapdb, table=farmer_market_nyc, query=SELECT * FROM farmer_market_nyc, host=127.0.0.1)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   CassandraTask_127_0_0_1_dapdb_SELECT___FROM_fa_c5305910e3   has status   DONE
DEBU

ID                 0
AddressLine1       0
ContactName        0
FCCAccepted        0
FCCIssued          0
FMNP               0
MarketLocation     0
MarketName         0
OperatingMonths    0
OperationHours     0
OperationSeason    0
PhoneNumber        2
SNAP               0
dtype: int64
ID,AddressLine1,ContactName,FCCAccepted,FCCIssued,FMNP,MarketLocation,MarketName,OperatingMonths,OperationHours,OperationSeason,PhoneNumber,SNAP



True

In [27]:
# close the session
session.shutdown()

# close the cluster connection
cluster.shutdown()

# Postgres Connection

In [18]:
import csv 
import psycopg2 
import matplotlib.pyplot as plt 
import pandas.io.sql as sqlio 
import seaborn as sns 
from sqlalchemy import create_engine, event, text, exc 
from sqlalchemy.engine.url import URL

In [19]:
connection_string = "postgresql+psycopg2://postgres:testpass@127.0.0.1:5432/postgres" 

try : 
    engine = create_engine(connection_string) 
    with engine.connect() as connection: 
        server_version = sqlio.read_sql_query( 
            text("SELECT VERSION();"),
            connection 
        ) 
except exc.SQLAlchemyError as dbError: 
    print ("PostgreSQL Error", dbError) 
else: 
    print(server_version["version"].values[0]) 
finally : 
    if engine in locals(): 
        engine.close()

PostgreSQL 15.2 (Debian 15.2-1.pgdg110+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 10.2.1-6) 10.2.1 20210110, 64-bit


In [20]:
try : 
    engine = create_engine(connection_string) 
    with engine.connect() as connection: 
        connection.execution_options(isolation_level="AUTOCOMMIT") 
        connection.execute(text("CREATE DATABASE Dapdb_postgres;")) 
except exc.SQLAlchemyError as dbError: 
    print ("PostgreSQL Error", dbError) 
finally : 
    if engine in locals(): 
        engine.close()

In [22]:
connection_string = "postgresql+psycopg2://postgres:testpass@127.0.0.1:5432/dapdb_postgres" 

try : 
    engine = create_engine(connection_string) 
    with engine.connect() as connection: 
        server_version = sqlio.read_sql_query( 
            text("SELECT VERSION();"),
            connection 
        ) 
except exc.SQLAlchemyError as dbError: 
    print ("PostgreSQL Error", dbError) 
else: 
    print(server_version["version"].values[0]) 
finally : 
    if engine in locals(): 
        engine.close()

PostgreSQL 15.2 (Debian 15.2-1.pgdg110+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 10.2.1-6) 10.2.1 20210110, 64-bit


In [54]:
connection_string = "postgresql+psycopg2://postgres:testpass@127.0.0.1:5432/dapdb_postgres" 
table_create_string = """ 
DROP table farmer
    """ 
try : 
    engine = create_engine(connection_string) 
    with engine.connect() as connection: 
        connection.execution_options(isolation_level="AUTOCOMMIT") 
        connection.execute(text(table_create_string)) 
except exc.SQLAlchemyError as dbError: 
    print ("PostgreSQL Error", dbError) 
finally : 
    if engine in locals(): 
        engine.close()

In [55]:
connection_string = "postgresql+psycopg2://postgres:testpass@127.0.0.1:5432/dapdb_postgres" 
table_create_string = """ 
CREATE TABLE Farmer( 
    ID varchar(150) NOT NULL, 
    AddressLine1 varchar(100) NOT NULL, 
    ContactName VARCHAR(50) NOT NULL, 
    FCCAccepted boolean NOT NULL, 
    FCCIssued boolean NOT NULL,
    FMNP boolean NOT NULL, 
    MarketLocation VARCHAR(100) NOT NULL,
    MarketName VARCHAR(100) NOT NULL, 
    OperatingMonths VARCHAR(10) NOT NULL, 
    OperationHours VARCHAR(100) NOT NULL, 
    OperationSeason VARCHAR(100) NOT NULL, 
    PhoneNumber VARCHAR(10) NULL,  
    SNAP boolean NOT NULL
    ); 
    """ 
try : 
    engine = create_engine(connection_string) 
    with engine.connect() as connection: 
        connection.execution_options(isolation_level="AUTOCOMMIT") 
        connection.execute(text(table_create_string)) 
except exc.SQLAlchemyError as dbError: 
    print ("PostgreSQL Error", dbError) 
finally : 
    if engine in locals(): 
        engine.close()

In [66]:
connection_string = "postgresql+psycopg2://postgres:testpass@127.0.0.1:5432/dapdb_postgres" 
table_create_string = """ 
DROP table location
    """ 
try : 
    engine = create_engine(connection_string) 
    with engine.connect() as connection: 
        connection.execution_options(isolation_level="AUTOCOMMIT") 
        connection.execute(text(table_create_string)) 
except exc.SQLAlchemyError as dbError: 
    print ("PostgreSQL Error", dbError) 
finally : 
    if engine in locals(): 
        engine.close()

In [67]:
connection_string = "postgresql+psycopg2://postgres:testpass@127.0.0.1:5432/dapdb_postgres" 
table_create_string = """ 
CREATE TABLE Location( 
    County varchar(50) NOT NULL, 
    City varchar(50) NOT NULL, 
    State VARCHAR(50) NOT NULL, 
    Zip INTEGER NOT NULL, 
    GeoReference VARCHAR(100) NOT NULL,
    StreetName VARCHAR(50) NOT NULL, 
    StreetNumber VARCHAR(50) NOT NULL
    ); 
    """ 
try : 
    engine = create_engine(connection_string) 
    with engine.connect() as connection: 
        connection.execution_options(isolation_level="AUTOCOMMIT") 
        connection.execute(text(table_create_string)) 
except exc.SQLAlchemyError as dbError: 
    print ("PostgreSQL Error", dbError) 
finally : 
    if engine in locals(): 
        engine.close()

In [73]:
conn = psycopg2.connect("postgresql://postgres:testpass@127.0.0.1:5432/dapdb_postgres")
cur = conn.cursor()

In [57]:
with open('cassandra_data_farmer.csv', 'r') as f:
    next(f)  # Skip the header row.
    cur.copy_from(f, 'farmer', sep=',')

In [74]:
with open('cassandra_location.csv', 'r') as f:
    next(f)  # Skip the header row.
    cur.copy_from(f, 'location', sep=',')

In [75]:
conn.commit()
cur.close()
conn.close()