In [1]:
import logging
import apache_beam as beam
import apache_beam.io.gcp.bigquery as bq

In [2]:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

In [3]:
INPUT_PROJECT_ID ="my-bq-demo"
INPUT_DATASET_ID="input"
INPUT_TABLE_ID="input_for_transpose"
OUTPUT_PROJECT_ID ="my-bq-demo"
OUTPUT_DATASET_ID="output"
OUTPUT_TABLE_ID="transposed"

In [197]:
INPUT_SCHEMA = {
    'fields': [{
        'name': 'ID', 
        'type': 'INTEGER', 
        'mode': 'REQUIRED'
    }, 
    {
        'name': 'CLASS', 
        'type': 'STRING', 
        'mode': 'NULLABLE'
    },
    {
        'name': 'SALES', 
        'type': 'FLOAT', 
        'mode': 'NULLABLE'
        }]
        }

key_field = ['ID']
pivot_field = ['CLASS']
value_field = ['SALES']


In [198]:
# OUTPUT_SCHEMA = {'fields':[]}

In [199]:
def generateSchemaFromInput(input_schema,key_field,value_field):
    key_schema = {}
    value_schema = {}
    for d in input_schema['fields']:
        for field in key_field:
            if d['name'] == field:
                key_schema[field] = d['type']
        for field in value_field:
            if d['name'] == field:
                value_schema[field] = d['type']
    return key_schema,value_schema

In [200]:
class getKeySchema(beam.DoFn):
    def process(self, element, key_schema):
        rt_dict={}
        rt_dict['mode'] = 'REQUIRED'
        rt_dict['name'] = element
        rt_dict['type'] = key_schema[element]
        yield rt_dict
    

In [201]:
class GetPivotValues(beam.DoFn):
    def process(self, element):
        # print(element)
        rt_elem = {}
        row = element
        for field in pivot_field:
            rt_elem = (field, row[field])
            yield rt_elem

In [202]:
class UniqueList(beam.DoFn):
    def process(self, element):
        rt_elem =  list(set(element[1])) 
        yield rt_elem

In [203]:
class FoldPivotValues(beam.DoFn):
    def process(self, element,value_schema):
        val_key = value_schema.keys()
        rt_dict={}
        rt_dict['mode'] = 'NULLABLE'
        for piv in element:
            for val in val_key:
                name = f"{piv}_{val}"
                rt_dict['name'] = name
                rt_dict['type'] = value_schema[val]
                yield rt_dict

In [207]:
class SplitAsKV(beam.DoFn):
    def process(self,element):
        d = {}
        key = ""
        for k in key_field:
            d[k] = element[k]
            key  += str(element[k])
        for piv in pivot_list:
            for val in value_list:
                name = f"{element[piv]}_{val}"
                d[name] = element[val]
        yield (key,d)

In [205]:
class CreateTableRow(beam.DoFn):
    def process(self,element):
        d = {}
        for elem_d in element[1]:
            for k in elem_d.keys():
                d[k] = elem_d[k]
        yield d

In [233]:
def addFieldsToOutputSchema(element):
    print(f"inside add fields {element} ")
    # dynamic_schema = bigquery.TableFieldSchema()
    # dynamic_schema.name = element.name
    # dynamic_schema.type = element.type
    # dynamic_schema.mode = element.mode
    OUTPUT_SCHEMA['fields'].append(element)
    return None

In [223]:
OUTPUT_SCHEMA = {'fields':[ ]}

In [235]:
OUTPUT_SCHEMA = {}
OUTPUT_SCHEMA['fields'] = []
print(OUTPUT_SCHEMA)
d = {'mode': 'NULLABLE', 'name': 'AAA_SALES', 'type': 'FLOAT'}
addFieldsToOutputSchema(d)
print(OUTPUT_SCHEMA)

{'fields': []}
inside add fields {'mode': 'NULLABLE', 'name': 'AAA_SALES', 'type': 'FLOAT'} 
{'fields': [{'mode': 'NULLABLE', 'name': 'AAA_SALES', 'type': 'FLOAT'}]}


In [236]:
OUTPUT_SCHEMA = {}
OUTPUT_SCHEMA['fields'] = []
print(OUTPUT_SCHEMA)
print(INPUT_SCHEMA)
key_schema,value_schema = generateSchemaFromInput(INPUT_SCHEMA,key_field,value_field)
# addKeyFieldToOutputSchema(key_field, key_schema)
print(OUTPUT_SCHEMA)
print(key_schema)
with beam.Pipeline(InteractiveRunner()) as p:

        input_table = ( p 
                    | "Read BigQuery table" >>  beam.Create(
                         [{"ID":123, "CLASS": "AAA", "SALES":101.44},
                        {"ID":123, "CLASS": "BBB", "SALES":345.44},
                        {"ID":1234, "CLASS": "AAA", "SALES":458.44}]
                        )
                    )
        key_schema = ( p | "Read the id fields from key field" >> beam.Create(
                        key_field )
                      | "Add Key field" >> beam.ParDo(getKeySchema(),key_schema= key_schema)
                      # | "Distinct Values" >> beam.Distinct()
                     )
        
        pivoted_schema = ( input_table
                  | "Get Pivot Schema" >> beam.ParDo(GetPivotValues())        
                  | "Group by pivot field" >> beam.GroupByKey()
                  | "Get unique list" >> beam.ParDo(UniqueList())
                  | "Fold pivot values to columns" >> beam.ParDo(
                        FoldPivotValues(),value_schema= value_schema)
                       )
        dynamic_schema = ( (key_schema,pivoted_schema) 
                            | "Merge two schema" >> beam.Flatten()
                          | "Add to output table schema" >> beam.Map(addFieldsToOutputSchema)
                         )
        
        
        table_row = ( input_table
                  | "Split as Key Value" >> beam.ParDo(SplitAsKV())
                    | "Group by key" >> beam.GroupByKey()
                  | "Create the table row" >> beam.ParDo(CreateTableRow())
                       )
        
#         pivot_rows = ( input_table 
#                        | "Separate key fields" >> beam.Map(separateKeyValue)
#                      )
        
        

{'fields': []}
{'fields': [{'name': 'ID', 'type': 'INTEGER', 'mode': 'REQUIRED'}, {'name': 'CLASS', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'SALES', 'type': 'FLOAT', 'mode': 'NULLABLE'}]}
{'fields': []}
{'ID': 'INTEGER'}




inside add fields {'mode': 'NULLABLE', 'name': 'AAA_SALES', 'type': 'FLOAT'} 
inside add fields {'mode': 'NULLABLE', 'name': 'BBB_SALES', 'type': 'FLOAT'} 
inside add fields {'mode': 'REQUIRED', 'name': 'ID', 'type': 'INTEGER'} 


In [237]:
print(OUTPUT_SCHEMA)

{'fields': [{'mode': 'NULLABLE', 'name': 'AAA_SALES', 'type': 'FLOAT'}, {'mode': 'NULLABLE', 'name': 'BBB_SALES', 'type': 'FLOAT'}, {'mode': 'REQUIRED', 'name': 'ID', 'type': 'INTEGER'}]}


In [209]:
ib.show(input_table)

In [211]:
ib.show(table_row)

In [83]:
ib.show(pivoted_schema)

In [84]:
ib.show(dynamic_schema)

In [57]:
print(OUTPUT_SCHEMA)

{'fields': [{'mode': 'REQUIRED', 'name': 'ID', 'type': 'INTEGER'}, {'mode': 'NULLABLE', 'name': 'AAA_SALES', 'type': 'FLOAT'}, {'mode': 'NULLABLE', 'name': 'AAA_SALES', 'type': 'FLOAT'}]}


In [None]:
'fields': [{'mode': 'REQUIRED', 'name': 'ID', 'type': 'INTEGER'}, 
           {'mode': 'NULLABLE', 'name': 'AAA_SALES', 'type': 'FLOAT'}, 
           {'mode': 'NULLABLE', 'name': 'AAA_SALES', 'type': 'FLOAT'}]}


In [92]:
key_list= ['ID','ID2']
pivot_list = ['CLASS']
value_list =['SALES']

In [151]:
d = {"ID":123, "ID2":111}

In [166]:
i = 123
s = str(i) + "23"
print(s)

12323


In [168]:
s=""
for v in d.values():
    s += str(v)
print(s)

123111


In [188]:
l = [{'ID': 123, 'ID2': 111, 'AAA_SALES': 101.44}, {'ID': 123, 'ID2': 111, 'BBB_SALES': 345.44}]

In [182]:
d1 = {'ID': 123, 'ID2': 111, 'AAA_SALES': 101.44}
d2 = {'ID': 123, 'ID2': 111, 'BBB_SALES': 345.44}

In [189]:
d = {}
for di in l:
    for k in di.keys():
        d[k] = di[k]
print(d)

{'ID': 123, 'ID2': 111, 'AAA_SALES': 101.44, 'BBB_SALES': 345.44}


In [186]:
print(d2)

{'ID': 123, 'ID2': 111, 'BBB_SALES': 345.44}


In [169]:
class SplitAsKV(beam.DoFn):
    def process(self,element):
        d = {}
        key = ""
        for k in key_list:
            d[k] = element[k]
            key  += str(element[k])
        for piv in pivot_list:
            for val in value_list:
                name = f"{element[piv]}_{val}"
                d[name] = element[val]
        yield (key,d)

In [191]:
class CreateTableRow(beam.DoFn):
    def process(self,element):
        d = {}
        for elem_d in element[1]:
            for k in elem_d.keys():
                d[k] = elem_d[k]
        yield d

In [193]:
key_schema,value_schema = generateSchemaFromInput(INPUT_SCHEMA,key_field,value_field)
input_table = ( p 
                    | "Read BigQuery table" >>  beam.Create(
                         [{"ID":123, "ID2":111,"CLASS": "AAA", "SALES":101.44},
                        {"ID":123, "ID2":111, "CLASS": "BBB", "SALES":345.44},
                        {"ID":1234,"ID2":1231, "CLASS": "AAA", "SALES":458.44},
                          {"ID":1234,"ID2":123, "CLASS": "BBB", "SALES":48.34},
                          {"ID":1234,"ID2":123, "CLASS": "AAA", "SALES":418.64}
                         ]
                        )
                    )
table_row = ( input_table
                  | "Split as Key Value" >> beam.ParDo(SplitAsKV())
                    | "Group by key" >> beam.GroupByKey()
                  | "Create the table row" >> beam.ParDo(CreateTableRow())
                       )

In [194]:
ib.show(input_table)



In [195]:
ib.show(table_row)

