In [1]:
import findspark
findspark.init()
import pyspark
import random
from decimal import Decimal
sc = pyspark.SparkContext()
import json

In [2]:
def unmarshal_dynamodb_json(node):
    data = dict({})
    data['M'] = node
    return _unmarshal_value(data)


def _unmarshal_value(node):
    if type(node) is not dict:
        return node

    for key, value in node.items():
        # S – String - return string
        # N – Number - return int or float (if includes '.')
        # B – Binary - not handled
        # BOOL – Boolean - return Bool
        # NULL – Null - return None
        # M – Map - return a dict
        # L – List - return a list
        # SS – String Set - not handled
        # NN – Number Set - not handled
        # BB – Binary Set - not handled
        key = key.lower()
        if key == 'bool':
            return value
        if key == 'null':
            return None
        if key == 's':
            return value
        if key == 'n':
            if '.' in str(value):
                return Decimal(value)
            return int(value)
        if key in ['m', 'l']:
            if key == 'm':
                data = {}
                for key1, value1 in value.items():
                    if key1.lower() == 'l':
                        data = [_unmarshal_value(n) for n in value1]
                    else:
                        if type(value1) is not dict:
                            return _unmarshal_value(value)
                        data[key1] = _unmarshal_value(value1)
                return data
            data = []
            for item in value:
                data.append(_unmarshal_value(item))
            return data

In [3]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
log_txt=sc.textFile("parsing.txt")

In [4]:
dfout = log_txt.map(lambda j: unmarshal_dynamodb_json(json.loads(j)))

In [5]:
print(dfout.first())

{u'updated': u'2017-09-21T20:00:53.065618', u'is_fake_company': False, u'tenant_id': 1, u'is_shiftgig_company': False, u'is_status_company': True, u'company_id': 251, u'company_name': u'Sodexo | Northwestern University:McCormick School of Engineering', u'is_geo_enabled_company': False, u'company_market_id': 1, u'is_dead_company': False}


In [6]:
type(dfout)

pyspark.rdd.PipelinedRDD

In [8]:
dfout.take(5)

[{u'company_id': 251,
  u'company_market_id': 1,
  u'company_name': u'Sodexo | Northwestern University:McCormick School of Engineering',
  u'is_dead_company': False,
  u'is_fake_company': False,
  u'is_geo_enabled_company': False,
  u'is_shiftgig_company': False,
  u'is_status_company': True,
  u'tenant_id': 1,
  u'updated': u'2017-09-21T20:00:53.065618'},
 {u'company_id': 3757,
  u'company_market_id': 10,
  u'company_name': u'The Victory Ship Inc.',
  u'is_dead_company': False,
  u'is_fake_company': False,
  u'is_geo_enabled_company': False,
  u'is_shiftgig_company': False,
  u'is_status_company': True,
  u'tenant_id': 1,
  u'updated': u'2018-04-13T09:38:34.977732'},
 {u'company_id': 32,
  u'company_market_id': 1,
  u'company_name': u'Do Not Use- Levy Restaurants',
  u'is_dead_company': True,
  u'is_fake_company': False,
  u'is_geo_enabled_company': False,
  u'is_shiftgig_company': False,
  u'is_status_company': True,
  u'tenant_id': 1,
  u'updated': u'2017-09-21T20:00:53.065618'},
 {

In [13]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df_csv = sqlContext.createDataFrame(dfout, ['company_id', 'company_market_id','company_name','is_dead_company','is_fake_company','is_geo_enabled_company','is_shiftgig_company','is_status_company','tenant_id','updated'])



In [14]:
df_csv.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('csv_out.csv')
