In [None]:
import apache_beam as beam
import pandas as pd
import glob
import os

from google.cloud import bigquery
from google.cloud import storage

import google.auth

from datetime import datetime

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.runners import DataflowRunner

import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner

To access data in our csv files, it needs to be put on a bigquery table first.

## Pushing data in csv files to a bigquery table  

In [6]:
bq_client = bigquery.Client()

### Create Dataset

In [7]:
dataset = bigquery.Dataset('text-analysis-323506.dataflow_dataset')

In [8]:
dataset.location = "us-east1"

In [9]:
dataset = bq_client.create_dataset(dataset, timeout=30)

### Create Table 

In [26]:
schema = [
        bigquery.SchemaField("Series_reference", "STRING", mode="NULLABLE"),
        bigquery.SchemaField("Period", "FLOAT", mode="NULLABLE"),
        bigquery.SchemaField("Data_value", "FLOAT", mode="NULLABLE"),
        bigquery.SchemaField("STATUS", "STRING", mode="NULLABLE"),
        bigquery.SchemaField("UNITS", "STRING", mode="NULLABLE"),
        bigquery.SchemaField("MAGNTUDE", "INTEGER", mode="NULLABLE"),
        bigquery.SchemaField("Subject", "STRING", mode="NULLABLE"),
        bigquery.SchemaField("Group", "STRING", mode="NULLABLE"),
        bigquery.SchemaField("Series_title_1", "STRING", mode="NULLABLE"),
        bigquery.SchemaField("Series_title_2", "STRING", mode="NULLABLE"),
        bigquery.SchemaField("Series_title_3", "STRING", mode="NULLABLE"),
        bigquery.SchemaField("Series_title_4", "STRING", mode="NULLABLE"),
        bigquery.SchemaField("Series_title_5", "STRING", mode="NULLABLE"),
    ]

In [27]:
table_id = 'text-analysis-323506.dataflow_dataset.df_table'

In [28]:
table = bigquery.Table(table_id, schema=schema)

In [29]:
table = bq_client.create_table(table)

## Insert data into table

In [30]:
file_list = glob.glob('./data/*.csv')
gcs_dir = 'gs://text-analysis-323506/data'

In [31]:
# Job Config

job_config = bigquery.LoadJobConfig(
    schema=schema,
    skip_leading_rows=1,
    # The source format defaults to CSV, so the line below is optional.
    source_format=bigquery.SourceFormat.CSV,
    # WRITE_TRUNCATE replaces existing data 
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND
)

In [32]:
for file in file_list:
    os.system(f'gsutil -m cp {file} {gcs_dir}')
    gcs_uri = os.path.join(gcs_dir, os.path.basename(file))
    
    load_job = bq_client.load_table_from_uri(
        gcs_uri, table_id, job_config=job_config, 
    )
    
    load_job.result()  # Waits for the job to complete.

In [33]:
destination_table = bq_client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

Loaded 1318215 rows.


#### Data is in BigQuery table

In [44]:
count = 0
def get_info(df):
    global count
    count = count + 1
    return {'df_num': count, 'columns': list(df.columns), 'shape': df.shape}

In [45]:
def list_files(filepath: str):
    if filepath.startswith('gs://'):
        os.system(f'gsutil -m cp {filepath} /tmp/')
        return glob.glob(os.path.join('/tmp/', os.path.basename(filepath)))
    else:
        return glob.glob(filepath)

In [46]:
# Create pipeline object
p = beam.Pipeline(InteractiveRunner())

In [47]:
# Add pipeline components
csv_details =   (
                    p 
                    | 'List csv files' >> beam.Create(list_files('gs://text-analysis-323506/data/*.csv'))
                    | 'Read csv files' >> beam.Map(pd.read_csv)
                    | 'Get csv details' >> beam.Map(get_info)
                 )

#### Visualize Pipeline

In [48]:
ib.show_graph(p)

##### Make sure tar file in './data' folder is un-tarred. That will yield 4 csv files.

In [24]:
start_time = datetime.now()
p.run().wait_until_finish()
end_time = datetime.now()

  bundle_processor.process_bundle(instruction_id))
  bundle_processor.process_bundle(instruction_id))
  bundle_processor.process_bundle(instruction_id))


In [25]:
ib.show(csv_details)

### Time taken when using pipeline

In [19]:
print(f"Time taken: {(end_time - start_time).total_seconds()} Seconds")

Time taken: 13.906486 Seconds


In [8]:
start_time = datetime.now()
for csv_file in glob.glob('./data/*.csv'):
    df = pd.read_csv(csv_file)
    print_info(df)
end_time = datetime.now()

  interactivity=interactivity, compiler=compiler, result=result)



Dataframe: 5
columns:  ['Series_reference', 'Period', 'Data_value', 'STATUS', 'UNITS', 'MAGNTUDE', 'Subject', 'Group', 'Series_title_1', 'Series_title_2', 'Series_title_3', 'Series_title_4', 'Series_title_5']
Shape:  (1118488, 13)


Dataframe: 6
columns:  ['Series_reference', 'Period', 'Data_value', 'STATUS', 'UNITS', 'MAGNTUDE', 'Subject', 'Group', 'Series_title_1', 'Series_title_2', 'Series_title_3', 'Series_title_4', 'Series_title_5']
Shape:  (25468, 13)



  interactivity=interactivity, compiler=compiler, result=result)



Dataframe: 7
columns:  ['STATUS', 'SER_NBR', 'Series_reference', 'Period', 'Data_value', 'UNITS', 'MAGNTUDE', 'Subject', 'Group', 'Age Group 3 brackets', 'Age Group', 'Age Group 6 brackets', 'Duration of unemployment', 'Employed and Unemployed Persons, Full-Time and Part-Time Status', 'Employment relationship', 'Employment status', 'Ethnic Single / Combination', 'Ethnic Total Response', 'Formal study status', 'Highest qualification', 'Hours Worked', 'Household Composition', 'Household Labour Force Status', 'Industry ANZSIC06', 'Industry ANZSIC06 Supplementary', 'Job', 'Job tenure', 'Labour force and education status', 'Labour Force Status', 'Main activity', 'Main job', 'Methods of seeking employment', 'Occupation ANZSCO Level 1', 'Percentage change from previous period and same period previous year', 'Persons Employed, Unemployed, Not in Labour Force (for current quarter)', 'Reason for leaving last job', 'Reason not seeking work', 'Reason not wanting work', 'Reasons not available for 

  interactivity=interactivity, compiler=compiler, result=result)



Dataframe: 8
columns:  ['Series_reference', 'Period', 'Data_value', 'STATUS', 'UNITS', 'MAGNTUDE', 'Subject', 'Group', 'Series_title_1', 'Series_title_2', 'Series_title_3', 'Series_title_4', 'Series_title_5']
Shape:  (174259, 13)



### Time taken when using python directly

In [9]:
print(f"Time taken: {(end_time - start_time).total_seconds()} Seconds")

Time taken: 12.626352 Seconds


##### Python looks slightly faster here. But when the same job is submitted to cloud dataflow, The process will run faster since it uses parallel computing in distributed systems and also it's scalable. 

## Run Pipeline as a dataflow job

In [37]:
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions(flags=[])

# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = 'us-central1'

In [38]:
dataflow_gcs_location = 'gs://text-analysis-323506/dataflow_csv_reader'

In [39]:
# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location

In [40]:
# The directory to store the output files of the job.
output_gcs_location = '%s/output' % dataflow_gcs_location

In [41]:
# Specifying the Cloud Storage location to write `csvc_details` to,
# based on the `output_gcs_location` variable set earlier.
# (csv_details | 'Write csv details to Cloud Storage' 
#  >> beam.io.WriteToText(output_gcs_location + '/csv_details-output.txt'))

<PCollection[[41]: Write csv details to Cloud Storage/Write/WriteImpl/FinalizeWrite.None] at 0x7f8f09144b50>

In [42]:
ib.show_graph(p)

In [49]:
# Submit job
# Important: Enable dataflow api if not enabled in google cloud platform console
pipeline_result = DataflowRunner().run_pipeline(p, options=options)

