In [None]:
#Importing packages
import apache_beam as beam
import json,time
from apache_beam import window
from datetime import datetime
from apache_beam.options.pipeline_options import PipelineOptions
import os,copy
from google.cloud import bigquery

# Defining variables
PROJECT_NAME = ''
PUBSUB_TOPIC_NAME = ''
SUBSCRIPTION_NAME = ''
OUTPUT_DIR="" # path of bucket
runner="DirectRunner"
job_name="my-job"
schema = 'gender: STRING, first_name: STRING, last_name: STRING, id: INTEGER' # SCHEMA FOR BIGQUERY

# Adding the credentials to environment
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]=r"" # add key

bigquery_client = bigquery.Client()

# Prepares a reference to the dataset
dataset_ref = bigquery_client.dataset('test')
BQ_TABLE_SCHEMA = [
    bigquery.SchemaField('gender', 'STRING', mode='REQUIRED', description='Age'),
    bigquery.SchemaField('first_name', 'STRING', description='Name'),
    bigquery.SchemaField('last_name', 'STRING', description='Name'),
    bigquery.SchemaField('id', 'INTEGER', description='id')
]

table_ref = dataset_ref.table('pubsub_test',schema=BQ_TABLE_SCHEMA)


class something(beam.DoFn):
    
    def process(self,element):
        struc={} # creating a new dictionary with non-unicode keys
        row_tuples=[]
        #input: list [{},{},{}]
        #output: list of tuples [(),(),()]
        
        # iterate through the list of dictionaries
        for i in element:
            keys=i.keys()
            values=i.values()
            row_tuples.append(tuple(values))
            
        table_ref.insert_data(row_tuples)
        

# Pipeline fucntion creation
def preprocessing(argv=None):

    # options required to pass into the pipeline
    options = {
      'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
      'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
      'job_name': job_name + datetime.now().strftime('%y%m%d-%H%M%S'),
      'project': PROJECT_NAME,
      'teardown_policy': 'TEARDOWN_ALWAYS',
      'no_save_main_session': True,
      'runner':runner,
      'streaming': True,
    }
    
    # Parameter options, if specified, must be of type PipelineOptions
    opts = beam.pipeline.PipelineOptions(flags = [], **options)
    
    # providing opts which is of the type "PipelineOptions" to the Pipeline
    p = beam.Pipeline(options = opts)
    
    # Pipeline Structure
    messages=(p 
        | 'Pubsub read' >> beam.io.ReadStringsFromPubSub(subscription=SUBSCRIPTION_NAME)
        | 'convert to json object' >> beam.Map(lambda x: json.loads(x)) 
             
        |'Create Window' >> beam.WindowInto(window.SlidingWindows(300,60,offset=0))
              
        | 'some function' >> beam.ParDo(something())
        )
    
    # Run the pipeline and wait until the process is completed
    result = p.run()
    result.wait_until_finish()

if __name__=="__main__":
    preprocessing()

In [61]:
fetched = table_ref.fetch_data()
for i in fetched:
    print i

(u'male', u'tintin', u'snowy', 1)
(u'Male', u'Forester', u'Weyland', 1)
(u'male', u'tintin', u'snowy', 1)
(u'Male', u'Bard', u'Maken', 664)
(u'Female', u'Daloris', u'Berndsen', 219)
(u'Female', u'Ebba', u'Dee', 561)
(u'Female', u'Tamarra', u'Filyukov', 85)
(u'Male', u'Sascha', u'Sayre', 749)
(u'Female', u'Binnie', u'Phifer', 421)
(u'Male', u'Sampson', u'Swepstone', 336)
(u'Male', u'Nikolos', u'Bossons', 191)
(u'Male', u'Devin', u'Ashleigh', 500)
(u'Female', u'Amelita', u'Gregon', 39)
(u'Male', u'Glenn', u'Dallow', 348)
(u'Female', u'Rosina', u'Ratke', 324)
(u'Male', u'Loy', u'Harce', 515)
(u'Female', u'Aurlie', u'Shelper', 59)
(u'Male', u'Angie', u'Joynes', 431)
(u'Female', u'Olympe', u'Esson', 684)
(u'Female', u'Zaria', u'Apdell', 978)
(u'Female', u'Stevena', u'Abram', 476)
(u'Male', u'Evin', u'Abbots', 285)
(u'Female', u'Orelle', u'Gricks', 366)
(u'Female', u'Cathy', u'Elvidge', 843)
(u'Male', u'Roland', u'Malkinson', 922)
(u'Female', u'Glenn', u'Mackney', 432)
(u'Male', u'Norry', u'