In [1]:
# Configure the necessary Spark environment
import os
import sys

spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, spark_home + "/python")

# Add the py4j to the path.
# You may need to change the version number to match your install -- currently using spark 2.4
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))

#os.environ['PYSPARK_SUBMIT_ARGS']="--jars /work/ericr/spark/sparkdev/postgresql.jar --executor-memory 40g --executor-cores 16 pyspark-shell"

# Initialize PySpark to predefine the SparkContext variable 'sc'
#execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

In [34]:
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import flatten, explode, col
import json


In [4]:
spark = SparkSession.builder.appName('Basic').config("spark.executor.extraJavaOptions","--executor-memory 40G --executor-cores 40 --driver-class-path $SPARK_HOME/postgresql.jar").getOrCreate()

In [5]:
dbconfig = {"url": "jdbc:postgresql://localhost/qxedb",
            "dbtable": "gateway_eventnotification",
            "user": "hermes",
            "password": "mysecret",
            "driver": "org.postgresql.Driver"}

In [6]:
spark

In [7]:
df = spark.read.jdbc(url="jdbc:postgresql://localhost/qxedb", 
                      table="gateway_eventnotification",
                      properties=dbconfig)

In [8]:
sqlcontext = SQLContext(spark.sparkContext)

In [9]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- payload: string (nullable = true)
 |-- event_category_id: string (nullable = true)
 |-- table_data: string (nullable = true)



In [10]:
payload_df = sqlcontext.read.json(df.rdd.map(lambda r: r.payload))

In [11]:
payload_df.printSchema()

root
 |-- data_type: string (nullable = true)
 |-- events: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dateTime: string (nullable = true)
 |    |    |-- eventCategory: string (nullable = true)
 |    |    |-- eventDetail: struct (nullable = true)
 |    |    |    |-- brand: string (nullable = true)
 |    |    |    |-- connectionInfo: struct (nullable = true)
 |    |    |    |    |-- localIpv4Address: struct (nullable = true)
 |    |    |    |    |    |-- addr: string (nullable = true)
 |    |    |    |    |    |-- subnetMask: string (nullable = true)
 |    |    |    |    |-- sourceInSameSubnet: string (nullable = true)
 |    |    |    |    |-- sourceIpv4Address: struct (nullable = true)
 |    |    |    |    |    |-- addr: string (nullable = true)
 |    |    |    |    |    |-- subnetMask: string (nullable = true)
 |    |    |    |    |-- sourceIpv6Address: struct (nullable = true)
 |    |    |    |    |    |-- addr: string (nullable = true)
 |   

In [12]:
versions = payload_df.select("version")

In [13]:
versions.show()

+-------+
|version|
+-------+
|  1.1.0|
|  1.1.0|
|  1.1.0|
|  1.1.0|
|  1.1.0|
|  1.1.0|
|  1.1.0|
|  1.1.0|
|  1.1.0|
|  1.1.0|
|  1.1.0|
|   null|
|  1.1.0|
|  1.1.0|
|  1.1.0|
|   null|
|  1.1.0|
|  1.1.0|
|  1.1.0|
|  1.1.0|
+-------+
only showing top 20 rows



In [14]:
flatten(payload_df.events)

Column<b'flatten(events)'>

In [None]:
pyspark.sql.functions

In [None]:
dir(pyspark.sql.functions)

In [17]:
x = payload_df.select(explode(payload_df.events).alias("event"))

In [49]:
x.head(1)

[Row(event=Row(dateTime='2001-02-19T19:01:42.0Z', eventCategory='supply', eventDetail=Row(brand='HP', connectionInfo=None, connectionState=None, connectionStateCurrent=None, connectionStateMax=None, connectionStateMin=None, consumableLabelCode='M', consumableManufacturingSignature='020101f1126ffa0f8285c448769c641023286cab1acee82f43ccc6b7be426800c0186019ec36be2bca80e84947cbc294521fe51eae1a245306fc10bf96fd023caae3083d0dc42b2a20d1f04655986f9a31657407bb', consumablePercentageLevelRemaining=90, consumableProductNumber=None, consumableSerialNumber='560256648', consumableState=None, consumableUniqueId='000000000000000045f0ff19d100088c', dataSource=None, domainConfig=None, downloadDuration=None, dustLevel=None, errorCategory=None, errorCode=None, errorType=None, eventCategory=None, eventCode='17.99.32', eventDetailAssert=None, eventDetailConsumable=None, eventDetailErrorRecovery=None, eventDetailJobStatus=None, eventDetailSystemError=None, eventDetailType=None, eventIntArray=None, eventOccurre

In [99]:
flx = x.select("event.eventDetail.eventCode")

In [None]:
flx.show(3,truncate=False)

In [90]:
def flatten_df1(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    flat_df = nested_df.select(flat_cols +
                               [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])
    return flat_df

In [91]:
def flatten_df(nested_df, layers):
    flat_cols = []
    nested_cols = []
    flat_df = []

    flat_cols.append([c[0] for c in nested_df.dtypes if c[1][:6] != 'struct'])
    nested_cols.append([c[0] for c in nested_df.dtypes if c[1][:6] == 'struct'])

    flat_df.append(nested_df.select(flat_cols[0] +
                               [col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols[0]
                                for c in nested_df.select(nc+'.*').columns])
                  )
    for i in range(1, layers):
        print (flat_cols[i-1])
        flat_cols.append([c[0] for c in flat_df[i-1].dtypes if c[1][:6] != 'struct'])
        nested_cols.append([c[0] for c in flat_df[i-1].dtypes if c[1][:6] == 'struct'])

        flat_df.append(flat_df[i-1].select(flat_cols[i] +
                                [col(nc+'.'+c).alias(nc+'_'+c)
                                    for nc in nested_cols[i]
                                    for c in flat_df[i-1].select(nc+'.*').columns])
        )

    return flat_df[-1]

In [93]:
def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colName = if (prefix == null) f.name else (prefix + "." + f.name)

    f.dataType match {
      case st: StructType => flattenSchema(st, colName)
      case _ => Array(col(colName))
    }
  })
}

SyntaxError: invalid syntax (<ipython-input-93-e553e3372ef4>, line 2)

In [92]:
flat_x = flatten_df(x,3)

[]


AnalysisException: 'Ambiguous reference to fields StructField(selfHeal,StructType(StructField(actions,ArrayType(StringType,true),true), StructField(results,ArrayType(StringType,true),true), StructField(selfHealVersion,StringType,true)),true), StructField(selfheal,StructType(StructField(actions,ArrayType(StringType,true),true), StructField(results,ArrayType(StringType,true),true), StructField(selfhealTest,StringType,true)),true);'

In [81]:
x2 = x.select(explode(x.event.eventDetail).alias("eventDetail"))

AnalysisException: "cannot resolve 'explode(`event`.`eventDetail`)' due to data type mismatch: input to function explode should be array or map type, not struct<brand:string,connectionInfo:struct<localIpv4Address:struct<addr:string,subnetMask:string>,sourceInSameSubnet:string,sourceIpv4Address:struct<addr:string,subnetMask:string>,sourceIpv6Address:struct<addr:string>,sourceMacAddress:string>,connectionState:string,connectionStateCurrent:bigint,connectionStateMax:bigint,connectionStateMin:bigint,consumableLabelCode:string,consumableManufacturingSignature:string,consumablePercentageLevelRemaining:bigint,consumableProductNumber:string,consumableSerialNumber:string,consumableState:string,consumableUniqueId:string,dataSource:string,domainConfig:struct<domainName:string,hostName:string,nameServerConfig:struct<ipv4Addresses:array<struct<addr:string>>,ipv6Addresses:array<struct<addr:string>>>,searchDomains:array<string>>,downloadDuration:bigint,dustLevel:bigint,errorCategory:string,errorCode:string,errorType:string,eventCategory:string,eventCode:string,eventDetailAssert:struct<eventCode:string,eventOccurrences:bigint>,eventDetailConsumable:struct<brand:string,consumableLabelCode:string,consumablePercentageLevelRemaining:bigint,consumableProductNumber:string,consumableSerialNumber:string,consumableState:string,consumableUniqueId:string,eventTrigger:string,installDate:string,isRefilled:string,isSetup:string,isTrial:string,manufactureDate:string,manufacturer:string,measuredQuantityState:string,midLabel:string,prevCartridge:struct<consumablePercentageLevelRemaining:bigint,consumableUniqueId:string>,stID:string,triggerCategory:string,version:string>,eventDetailErrorRecovery:struct<jobErrorRecovery:struct<errorType:string,failureReason:string,recoveryState:string>,version:string>,eventDetailJobStatus:struct<eventCode:string,eventOccurrences:bigint>,eventDetailSystemError:struct<errorCategory:string,eventCode:string,version:string>,eventDetailType:string,eventIntArray:array<bigint>,eventOccurrences:bigint,eventTrigger:string,eventTriggerReason:string,event_code:string,firmwareAssert:struct<app:string,code:string,file:string,fileCrc:string,firmwareVersion:string,line:bigint,type:string>,firmwareVersion:string,fmpRegexCandidate:string,freeMobilePhotoInfo:struct<fmpCounted:string,fmpMaxLength:bigint,fmpMaxWidth:bigint,fmpPageCount:bigint,fmpPages:string,nonFmpPageCount:bigint,nonFmpPages:string>,fwAssert:struct<app:string,code:string,file:string,fileCrc:string,firmwareVersion:string,line:bigint,type:string>,hasUsedNonHp:string,httpUserAgent:string,inkFillCapacity:bigint,inkLevelGauge:bigint,inkLevelSensorValue:bigint,installDate:string,installDuration:bigint,ippJobStatus:struct<dateTimeAtCompleted:string,dateTimeAtCreation:string,dateTimeAtProcessing:string,jobDocumentAccessErrors:string,jobImpressions:bigint,jobImpressionsCompleted:bigint,jobKOctets:bigint,jobKOctetsProcessed:bigint,jobMediaSheets:bigint,jobMediaSheetsCompleted:bigint,jobName:string,jobOriginatingUserName:string,jobPrinterURI:string,jobPrinterUpTime:bigint,jobPrinterUri:string,jobState:string,jobStateMessage:string,jobStateReasons:string,jobURI:string,jobUri:string,jobUuid:string>,ippMediaSize:struct<xDimension:bigint,yDimension:bigint>,isInstantInkSubscription:string,isRefilled:string,isSetup:string,isTrial:string,jobErrorRecovery:struct<errorType:string,failureReason:string,jobRecovery:string,recoveryState:string,version:string>,jobRecovery:string,jobType:string,job_type:string,key:string,linkStats:struct<rxDropped:bigint,rxErrors:bigint,rxErros:bigint,rxPackets:bigint,txDropped:bigint,txErrors:bigint,txPackets:bigint>,makeAndModel:string,manufactureDate:string,manufacturer:string,measuredQuantityState:string,midLabel:string,modelNumber:string,networkConfig:struct<ipv4Address:struct<addr:string,subnetMask:string>,ipv4Gateway:string,ipv6Addresses:array<struct<addr:string>>,ipv6Gateway:string>,networkInterfaceName:string,networkInterfaceType:string,penStatus:string,postStartupSupplyVolume:bigint,prevCartridge:struct<consumablePercentageLevelRemaining:bigint,consumableUniqueId:string>,previousCartridge:struct<consumablePercentageLevelRemaining:bigint,consumableUniqueId:string>,previousFwVersion:string,printInfo:struct<duplex:string,jobImpressionCompleted:bigint,jobImpressionsCompleted:bigint,mediaSizeDetected:struct<xDimension:bigint,yDimension:bigint>,mediaSizeRequested:struct<xDimension:bigint,yDimension:bigint>,mediaType:string,printMode:string>,printerRegion:bigint,productId:string,reason:string,scanInfo:struct<duplex:string,mediaSizeDetected:struct<height:bigint>,mediaSizeRequested:struct<height:bigint,width:bigint>>,selectabilityNumber:string,selfHeal:struct<actions:array<string>,results:array<string>,selfHealVersion:string>,selfheal:struct<actions:array<string>,results:array<string>,selfhealTest:string>,sequenceNumber:bigint,serialNumber:string,stID:string,stId:string,stateEnabled:boolean,stateStabilisationDelay:bigint,supplyFamilyId:bigint,supplyMarkedEmpty:string,supplyModelNumber:bigint,supplyRegion:bigint,supplyType:bigint,t4:bigint,totalInkUsageGauge:bigint,triggerCategory:string,uapConfig:struct<adminDisabled:string,powered:string>,updateMethod:string,updateSuccess:string,updateTriggerReason:string,updatedFwVersion:string,userConfig:struct<adminDisabled:string,changeCounter:bigint,ipv4ConfigMethod:string,ipv4Enabled:string,ipv6ConfigMethod:string,ipv6Enabled:string,passwdChanged:string,powered:string,ssid:string>,versio:string,version:string,wifiConfig:struct<associatedBssid:string,chanGraph:array<struct<anpi:bigint,channel:bigint,loadPercent:bigint,percentLoad:bigint>>,channelGraphs:array<struct<anpi:bigint,channel:bigint,percentLoad:bigint>>,channelInfos:array<struct<channel:bigint,signalAverage:bigint,signalMax:bigint,signalMin:bigint,singalAverage:bigint,singalMax:bigint,singalMin:bigint,ssidsCount:bigint>>,directedScanCount:bigint,lastAssociationCode:string,lastDeauthReason:bigint,signalQuality:struct<noise:bigint,strength:bigint>,undirectedScanCount:bigint>>;;\n'Project [explode(event#39.eventDetail) AS eventDetail#265]\n+- Project [event#39]\n   +- Generate explode(events#14), false, [event#39]\n      +- LogicalRDD [data_type#13, events#14, henq_metadata#15, links#16, metadata#17, originator#18, payloadManifestJec#19, subscriptionId#20, system_check_id#21, version#22], false\n"

In [76]:
systemErrors = x.filter("event.eventCategory == 'systemError'")

In [78]:
systemErrors.show(2,truncate=200)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                                                                   event|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[2018-08-09T23:18:47.0Z, systemError, [,,,,,,,,,,,,,,,,, hardwareFailure,,,, 71.00.E9,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,, 1.0.0,], com.hp.cdm.service.even...|
|[2001-04-28T16:46:26.0Z, systemError, [,,,,,,,,,,,,,,,,, hardwareFailure,,,, 71.00.E9,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,, 1.0.0,], com.hp.cdm.se

In [18]:
x.printSchema()

root
 |-- event: struct (nullable = true)
 |    |-- dateTime: string (nullable = true)
 |    |-- eventCategory: string (nullable = true)
 |    |-- eventDetail: struct (nullable = true)
 |    |    |-- brand: string (nullable = true)
 |    |    |-- connectionInfo: struct (nullable = true)
 |    |    |    |-- localIpv4Address: struct (nullable = true)
 |    |    |    |    |-- addr: string (nullable = true)
 |    |    |    |    |-- subnetMask: string (nullable = true)
 |    |    |    |-- sourceInSameSubnet: string (nullable = true)
 |    |    |    |-- sourceIpv4Address: struct (nullable = true)
 |    |    |    |    |-- addr: string (nullable = true)
 |    |    |    |    |-- subnetMask: string (nullable = true)
 |    |    |    |-- sourceIpv6Address: struct (nullable = true)
 |    |    |    |    |-- addr: string (nullable = true)
 |    |    |    |-- sourceMacAddress: string (nullable = true)
 |    |    |-- connectionState: string (nullable = true)
 |    |    |-- connectionStateCurrent: long 

In [72]:
x.unpersist()

DataFrame[event: struct<dateTime:string,eventCategory:string,eventDetail:struct<brand:string,connectionInfo:struct<localIpv4Address:struct<addr:string,subnetMask:string>,sourceInSameSubnet:string,sourceIpv4Address:struct<addr:string,subnetMask:string>,sourceIpv6Address:struct<addr:string>,sourceMacAddress:string>,connectionState:string,connectionStateCurrent:bigint,connectionStateMax:bigint,connectionStateMin:bigint,consumableLabelCode:string,consumableManufacturingSignature:string,consumablePercentageLevelRemaining:bigint,consumableProductNumber:string,consumableSerialNumber:string,consumableState:string,consumableUniqueId:string,dataSource:string,domainConfig:struct<domainName:string,hostName:string,nameServerConfig:struct<ipv4Addresses:array<struct<addr:string>>,ipv6Addresses:array<struct<addr:string>>>,searchDomains:array<string>>,downloadDuration:bigint,dustLevel:bigint,errorCategory:string,errorCode:string,errorType:string,eventCategory:string,eventCode:string,eventDetailAssert:s

In [50]:
y = x.select("event.eventCategory")

In [52]:
y.show()

+-------------+
|eventCategory|
+-------------+
|       supply|
|       supply|
|       supply|
|       supply|
|    jobStatus|
|    jobStatus|
|  wifiNetwork|
|       supply|
|       supply|
|       supply|
|       supply|
|       supply|
|       supply|
|  wifiNetwork|
|    jobStatus|
|       supply|
|       supply|
| system_check|
|     jobError|
|     jobError|
+-------------+
only showing top 20 rows



In [65]:
for t in y.groupby(['eventCategory']).count().collect():
    print(t)

Row(eventCategory='system_check', count=491)
Row(eventCategory='systemError', count=64419)
Row(eventCategory='printHead', count=12552)
Row(eventCategory='fwUpdate', count=24206)
Row(eventCategory='wifiNetwork', count=229454)
Row(eventCategory='jobStatus', count=73112)
Row(eventCategory='errorRecovery', count=25194)
Row(eventCategory='systemTest', count=305)
Row(eventCategory='jobError', count=235575)
Row(eventCategory='testInfo', count=92)
Row(eventCategory='info', count=2)
Row(eventCategory='test', count=2924)
Row(eventCategory='supply', count=744227)
Row(eventCategory='userConfigChanged', count=4)


In [55]:
z.printSchema()

AttributeError: 'GroupedData' object has no attribute 'printSchema'

In [61]:
fwall = x.select(["event.eventCategory","event.eventDetail.firmwareVersion","event.eventDetail.firmwareAssert.code"])

In [66]:
fwasserts = fwall.filter("eventCategory == 'systemError'")

In [70]:
fwasserts.cache()

DataFrame[eventCategory: string, firmwareVersion: string, code: string]

In [73]:
fwasserts.printSchema()

root
 |-- eventCategory: string (nullable = true)
 |-- firmwareVersion: string (nullable = true)
 |-- code: string (nullable = true)



In [74]:
fwasserts.show(truncate=200)

+-------------+---------------+----+
|eventCategory|firmwareVersion|code|
+-------------+---------------+----+
|  systemError|           null|null|
|  systemError|           null|null|
|  systemError|           null|null|
|  systemError|           null|null|
|  systemError|           null|null|
|  systemError|           null|null|
|  systemError|           null|null|
|  systemError|           null|null|
|  systemError|           null|null|
|  systemError|           null|null|
|  systemError|           null|null|
|  systemError|           null|null|
|  systemError|           null|null|
|  systemError|           null|null|
|  systemError|           null|null|
|  systemError|           null|null|
|  systemError|           null|null|
|  systemError|           null|null|
|  systemError|           null|null|
|  systemError|           null|null|
+-------------+---------------+----+
only showing top 20 rows



In [75]:
fwasserts.count()

64419