In [None]:
from __future__ import absolute_import
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import os
import google.auth
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.runners import DataflowRunner

In [None]:

def Split(element):
    return element.split(",")

In [None]:
# Returns a list of tuples containing  1 (key) and Rating value. 
# This form the input to GroupByKey, which takes (key,value) pair as input

def ExtractRating(element):
    result = [(1, element['Rating']) ]
    return result

In [None]:
# Returns a list of tuples containing the 1 (key) and Name value
# This form the input to GroupByKey, which takes (key,value) pair as input

def ExtractName(element):
    result = [(1, element['Name'])]
    return result

In [None]:
#Function to filter elements based on the GenreName applied
def FilterBasedonGenre(GenreName,element):
    return element['Genre']==GenreName

In [None]:
def FormatText(elem):
    return 'AVERAGE RATING OF BOOKS:'+str(elem[1])

In [None]:
class AverageFn(beam.CombineFn):
  
    def create_accumulator(self):
        return (0.0, 0)   # initialize (sum, count)

    def add_input(self, sum_count, inputt):
        (summ, count) = sum_count
        (summ2, c2) = inputt
        return summ + float(summ2)*c2, count + c2

    def merge_accumulators(self, accumulators):
    
        ind_sums, ind_counts = zip(*accumulators)       # zip - [(27, 3), (39, 3), (18, 2)]  -->   [(27,39,18), (3,3,2)]
        return sum(ind_sums), sum(ind_counts)        # (84,8)

    def extract_output(self, sum_count):    
    
        (summ, count) = sum_count    # combine globally using CombineFn
        return summ / count if count else float('NaN')

In [None]:
def f_mean(element):
    (summ,count)=element
    return (summ*count,count)
def accumulate(element):
    ind_sums, ind_counts = zip(*element)
    return sum(ind_sums), sum(ind_counts)
def op(s_count):
    (summ2,c2)=ind_counts
    return summ2/c2 if count else float('NaN')

In [None]:
options = PipelineOptions()
#p = beam.Pipeline(options=options)
p = beam.Pipeline(InteractiveRunner())
p2 = beam.Pipeline(InteractiveRunner())

In [None]:
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions(flags=[])

# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = 'us-central1'

In [None]:
dataflow_gcs_location = 'gs://proj_mtree/dataflow'

In [None]:
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location

In [None]:
client = bigquery.Client()

dataset_id = "mind10.flowtobq"

#dataset = bigquery.Dataset(dataset_id)

dataset.location = "US"
dataset.description = "dataset for food orders"

#dataset_ref = client.create_dataset(dataset, timeout = 30)

def to_json(csv_str):
    fields = csv_str.split(',')

    json_str = {"Name":fields[0],
                 "Author": fields[1],
                 "User_Rating": fields[2],
                 "Reviews": fields[3],
                 "Price": fields[4],
                 "Year": fields[5],
                 "Genre": fields[6],
                 
                 }

    return json_str

table_schema = 'Name:STRING,Author:STRING,User_Rating:FLOAT,Reviews:INTEGER,Price:Integer,Year:Integer,Genre:STRING'

bs = (p2 | beam.io.ReadFromText("gs://rr005/clean_books_amazon.csv"))
(bs | 'cleaned_data to json' >> beam.Map(to_json)
| 'write to bigquery' >> beam.io.WriteToBigQuery(
"mind10:flowtobq.t2",
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
custom_gcs_temp_location="gs://proj_mtree/dataflow/temp"

)

)

from apache_beam.runners.runner import PipelineState
ret = p2.run()
if ret.state == PipelineState.DONE:
    print('Success!!!')
else:
    print('Error Running beam pipeline')

In [None]:
books = ( p | beam.io.ReadFromText("gs://rr005/clean_books_amazon.csv") | beam.Map(Split) )

In [None]:
res1 = (
    books
    | beam.Filter(lambda rec : rec[6]=="Fiction")
    | beam.Map(lambda rec: (rec[2], 1))
    | "Grouping keys" >> beam.CombinePerKey(sum) 
#    | beam.Map(f_mean)
#    | beam.Map(accumulate)
#    | beam.Map(op)
    | "Combine Globally" >> beam.CombineGlobally(AverageFn())
#     | "Calculating mean" >> beam.CombineValues(beam.combiners.MeanCombineFn())
#     | "Apply Formatting" >> beam.Map(FormatText)
    | "write" >> beam.io.WriteToText("gs://proj_mtree/Fiction_Result1")
)

In [None]:
pipeline_result = DataflowRunner().run_pipeline(p, options=options)

In [None]:
from IPython.core.display import display, HTML
url = ('https://console.cloud.google.com/dataflow/jobs/%s/%s?project=%s' % 
      (pipeline_result._job.location, pipeline_result._job.id, pipeline_result._job.projectId))
display(HTML('Click <a href="%s" target="_new">here</a> for the details of your Dataflow job!' % url))