In [None]:
import argparse
import logging
import re
import csv
import io
import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions
import pandas as pd

csv = pd.read_csv(r'gs://shri_airbnb0410/AB_NYC_2019.csv')
csv['last_review'].fillna('7/20/2019', inplace = True)
csv['reviews_per_month'].fillna(0, inplace = True)
csv.update(csv[['name']].applymap('"{}"'.format))
csv['name'].replace('\n','', regex=True, inplace=True)
csv['name'].replace('\r','', regex=True, inplace=True)
csv['name'].replace(',','', regex=True, inplace=True)
csv['host_name'].replace(',',':', regex=True, inplace=True)
csv['neighbourhood'].replace(',',':', regex=True, inplace=True)
csv['name'].replace(';',':', regex=True, inplace=True)

df = pd.DataFrame(csv, columns = ['id','name','host_id','host_name','neighbourhood_group','neighbourhood','latitude','longitude','room_type','price','minimum_nights','number_of_reviews','last_review','reviews_per_month','calculated_host_listings_count','availability_365'])

df.to_csv('gs://shri_airbnb0410/update1.csv',index=False)

class DataIngestion:

    def parse_method(self, string_input):
        values = re.split(",",string_input)
        row = dict(zip(('id','name','host_id','host_name','neighbourhood_group','neighbourhood','latitude','longitude','room_type','price','minimum_nights','number_of_reviews','last_review','reviews_per_month','calculated_host_listings_count','availability_365'),values))
        return row


def run(argv=None):
    """The main function which creates the pipeline and runs it."""

    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to read. This can be a local file or '
        'a file in a Google Storage Bucket.',
        default='gs://shri_airbnb0410/update1.csv')

    parser.add_argument('--output',
                        dest='output',
                        required=False,
                        help='Output BQ table to write results to.',
                        default='airbnb.New_update1')
    parser.add_argument('--output2',
                        dest='output2',
                        required=False,
                        help='Output BQ table to write results to.',
                        default='airbnb.New_transformed')

    known_args, pipeline_args = parser.parse_known_args(argv)

    data_ingestion = DataIngestion()
    x = 0
    while x == 0:

        p = beam.Pipeline(options=PipelineOptions(pipeline_args))

        (p
        | 'Read from a File' >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
        | 'String To BigQuery Row' >>beam.Map(lambda s: data_ingestion.parse_method(s)) 
        | 'Write to BigQuery' >> beam.io.Write(
             beam.io.BigQuerySink(
                 known_args.output,
                 schema='id:INTEGER,name:STRING,host_id:INTEGER,host_name:STRING,neighbourhood_group:STRING,neighbourhood:STRING,latitude:FLOAT,longitude:FLOAT,room_type:STRING,price:INTEGER,minimum_nights:INTEGER,number_of_reviews:INTEGER,last_review:STRING,reviews_per_month:FLOAT,calculated_host_listings_count:INTEGER,availability_365:INTEGER',
                 create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                 write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
        p.run().wait_until_finish()
        x = x + 1

    while x == 1:
        line = beam.Pipeline(options=PipelineOptions(pipeline_args))
        (line
        | 'Read from a BigQuery' >> beam.io.Read(beam.io.BigQuerySource(query= (f'SELECT neighbourhood, COUNT(neighbourhood) as count FROM `canvas-replica-309317.airbnb.update1`'
                f'GROUP BY neighbourhood ORDER BY count DESC'),use_standard_sql=True))
        | 'Another Table'>> beam.io.Write(
            beam.io.BigQuerySink(
                 known_args.output2,
                 schema='neighbourhood:STRING,count:INTEGER',
                 create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                 write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) 
           )
         
        line.run().wait_until_finish()
        x = x + 1      

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()