In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import pandas as pd
import os

spark = SparkSession.builder \
    .appName("NetFlowReader") \
    .enableHiveSupport() \
    .getOrCreate()

es_reader = (spark.read
    .format("org.elasticsearch.spark.sql")
    .option("inferSchema", "true")
    .option("es.nodes","elksj-elasticsearch:9200")
)

In [2]:
# flowDF = es_reader.load("netflow-*/")
flowDF = es_reader.load("zeek-*/")
# flowDF.describe( ).show()
testflowDF = flowDF.limit(3) #限制log数量，生成一个小规模的DataFrame
testflowDF.show()
# flowDF.select( "ssl" ).show( false)

In [20]:
# Convert Spark Dataframe to Pandas Dataframe
flowpdDF = flowDF.toPandas()

In [21]:
flowpdDF.rename({"130":"EXPORTER_IPV4_ADDRESS",
                "11":"L4_DST_PORT",
                "10":"INPUT_SNMP",
                "21":"LAST_SWITCHED",
                "1":"IN_BYTES",
                "22":"FIRST_SWITCHED",
                "32":"ICMP_TYPE",
                "152":"FLOW_START_MILLISECONDS",
                "153":"FLOW_END_MILLISECONDS",
                "5":"SRC_TOS",
                "14":"OUTPUT_SNMP",
                "6":"TCP_FLAGS",
                "7":"L4_SRC_PORT",
                "15":"IPV4_NEXT_HOP",
                "8":"IPV4_SRC_ADDR",
                "60":"IP_PROTOCOL_VERSION",
                "16":"SRC_AS",
                "2":"IN_PKTS",
                "23":"OUT_BYTES",
                "18":"BGP_IPV4_NEXT_HOP",
                "34":"SAMPLING_INTERVAL",
                "17":"DST_AS",
                "4":"PROTOCOL",
                "42":"TOTAL_FLOWS_EXP",
                "12":"IPV4_DST_ADDR"}, axis=1, inplace=True)
print(flowpdDF)

In [22]:
# Load enrichment data as a Pandas DF
asnDF = pd.read_csv('asn_enrich.csv', delimiter='|').set_index('ASN')
asn_dict = asnDF.to_dict()
print(asn_dict)

In [23]:
# Enrich Data - anything unknown is assigned to -Reserved AS- with Country Code ZZ
# flowpdDF['SRC_ORG'] = flowpdDF['SRC_AS'].apply(lambda x: asn_dict['ORG'][x] if x in asn_dict['ORG'] else '-Reserved AS-')
# flowpdDF['SRC_COUNTRY'] = flowpdDF['SRC_AS'].apply(lambda x: asn_dict['COUNTRY'][x] if x in asn_dict['ORG'] else 'ZZ')

# flowpdDF['DST_ORG'] = flowpdDF['DST_AS'].apply(lambda x: asn_dict['ORG'][x] if x in asn_dict['ORG'] else '-Reserved AS-')
# flowpdDF['DST_COUNTRY'] = flowpdDF['DST_AS'].apply(lambda x: asn_dict['COUNTRY'][x] if x in asn_dict['ORG'] else 'ZZ')

In [24]:
flowpdDF.head()

Unnamed: 0,IN_BYTES,INPUT_SNMP,L4_DST_PORT,IPV4_DST_ADDR,EXPORTER_IPV4_ADDRESS,OUTPUT_SNMP,IPV4_NEXT_HOP,FLOW_START_MILLISECONDS,FLOW_END_MILLISECONDS,SRC_AS,...,IP_PROTOCOL_VERSION,L4_SRC_PORT,IPV4_SRC_ADDR,@timestamp,@version,log_type,SRC_ORG,SRC_COUNTRY,DST_ORG,DST_COUNTRY
0,326,0,443,204.14.235.124,172.18.0.9,0,0.0.0.0,1620986000000.0,1620986000000.0,0,...,4.0,50706,172.16.133.75,2021-05-14 09:54:20.653,1,netflow,-Reserved AS-,ZZ,SALESFORCE,US
1,846,0,5440,172.16.139.250,172.18.0.9,0,0.0.0.0,1620986000000.0,1620986000000.0,0,...,4.0,50892,172.16.133.75,2021-05-14 09:54:20.653,1,netflow,-Reserved AS-,ZZ,-Reserved AS-,ZZ
2,846,0,5440,172.16.139.250,172.18.0.9,0,0.0.0.0,1620986000000.0,1620986000000.0,0,...,4.0,52748,172.16.133.41,2021-05-14 09:54:20.653,1,netflow,-Reserved AS-,ZZ,-Reserved AS-,ZZ
3,326,0,443,204.14.235.124,172.18.0.9,0,0.0.0.0,1620986000000.0,1620986000000.0,0,...,4.0,50707,172.16.133.75,2021-05-14 09:54:20.654,1,netflow,-Reserved AS-,ZZ,SALESFORCE,US
4,326,0,443,204.14.235.124,172.18.0.9,0,0.0.0.0,1620986000000.0,1620986000000.0,0,...,4.0,50705,172.16.133.75,2021-05-14 09:54:20.654,1,netflow,-Reserved AS-,ZZ,SALESFORCE,US


In [25]:
# Convert the pandas DF back to Spark
# flowDF = spark.createDataFrame(flowpdDF)
schema = StructType([
    StructField("column_1", StringType(), True),
    StructField("column_2", StringType(), True),
    StructField("column_3", StringType(), True),
    StructField("column_4", StringType(), True),
    StructField("column_5", StringType(), True),
    StructField("column_6", StringType(), True),
    StructField("column_7", StringType(), True),
    StructField("column_8", StringType(), True),
    StructField("column_9", StringType(), True),
    StructField("column_10", StringType(), True),
    StructField("column_11", StringType(), True),
    StructField("column_12", StringType(), True),
    StructField("column_13", StringType(), True),
    StructField("column_14", StringType(), True),
    StructField("column_15", StringType(), True),
    StructField("column_16", StringType(), True),
    StructField("column_17", StringType(), True),
    StructField("column_18", StringType(), True),
    StructField("column_19", StringType(), True),
    StructField("column_20", StringType(), True),
    StructField("column_21", StringType(), True)
])
flowDF = spark.createDataFrame(flowpdDF, schema=schema)

In [26]:
# # Store to elasticsearch
# (flowDF.write.format('org.elasticsearch.spark.sql')
#              .option("es.nodes","elksj-elasticsearch:9200")
#              .option('es.resource', '%s/%s' % ('enriched_netflow', 'netflow'))
#              .save())