In [0]:

#getting the aws secret credentials
access_key = dbutils.secrets.get(scope = "aws", key = "aws-access-key")
secret_key = dbutils.secrets.get(scope = "aws", key = "aws-secret-key")

#configuring aws
hadoop_conf=spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_conf.set("fs.s3n.awsAccessKeyId", access_key)
hadoop_conf.set("fs.s3n.awsSecretAccessKey",secret_key)
encoded_secret_key = secret_key.replace("/", "%2F")
aws_bucket_name = "raf2"
mount_name = "aws-demo"

# If you are using Auto Loader file notification mode to load files, provide the AWS Region ID.
# aws_region = "aws-region-id"
# hadoop_conf.set("fs.s3a.endpoint", "s3." + aws_region + ".amazonaws.com")



dbutils.fs.mount("s3a://%s:%s@%s" % (access_key, encoded_secret_key, aws_bucket_name), "/mnt/%s" % mount_name)
display(dbutils.fs.ls("/mnt/%s" % mount_name))

In [0]:
Enabling spark.eventLog.rolling.enabled and spark.eventLog.rolling.maxFileSize would let you have rolling event log files instead of single huge event log file which may help some scenarios on its own, but it still doesn’t help you reducing the overall size of logs.

Spark History Server can apply compaction on the rolling event log files to reduce the overall size of logs, via setting the configuration spark.history.fs.eventLog.rolling.maxFilesToRetain on the Spark History Server.

Official page: https://spark.apache.org/docs/latest/monitoring.html 

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Kafka to Snowflake through spark") \
    .config("spark.python.profile", "true") \
    .config("spark.shuffle.compress", "true") \
    .config("spark.shuffle.io.preferDirectBufs", "true") \
    .config("spark.shuffle.io.backLog", "2") \
    .config("spark.shuffle.service.enabled", "true") \
    .config("spark.eventLog.enabled", "true") \
    .config("spark.eventLog.rolling.enabled", "true") \# 
    .config("spark.eventLog.rolling.maxFileSize", "10m") \
    .config("spark.eventLog.dir", "/mnt/%s" % mount_name) \#saving the spark logs to S3 to visualize the DAGS later on
    .getOrCreate()

In [0]:
df=spark.read.csv("dbfs:/mnt/%s/..." % mount_name, header=True)
df.cache().count()
df.show()

In [0]:
import pyspark.sql.functions as spark_func

spark.conf.set("spark.sql.shuffle.partitions",spark.sparkContext.defaultParallelism)

# Databricks provide the full fledged Kafka Server in which the Wikipedia edits are being written in real time from the various language-specific IRC channels to which Wikipedia posts them.
# The Kafka server parses the IRC data and then converts the messages to JSON format, and sends the JSON to a Kafka server that has a retention period of 3 days
# We will just consume those messages from Kafka already set by Databricks for learning
Kafka_server = "server1.databricks.training:9092"
kafka_properties = {
  'kafka.bootstrap.servers': Kafka_server,
  'subscribe':'en',# here we are subscribing to english topic that is from en.wikipedia.org
  'startingOffsets':'earliest',
  'maxOffsetsPerTrigger':1000
}

# reading the stream data from Kafka
df = (spark.readStream
      .format('kafka')
      .options(**kafka_properties)
      .load()
      .select(spark_func.col('value').cast('string'))# we are selecting the data that is in json
)

In [0]:
# the kafka topic "en" has the following fields
(spark.readStream
      .format('kafka')
      .options(**kafka_properties)
      .load())

In [0]:
import pyspark.sql.types as spark_type

#setting up the 
schema = spark_type.StructType([
  spark_type.StructField('channel',spark_type.StringType(),True),
  spark_type.StructField('comment',spark_type.StringType(),True),
  spark_type.StructField('delta',spark_type.IntegerType(),True),
  spark_type.StructField('flag',spark_type.StringType(),True),
  spark_type.StructField("geocoding",spark_type.StructType([
    spark_type.StructField('city',spark_type.StringType(),True),
    spark_type.StructField('country',spark_type.StringType(),True),
    spark_type.StructField('countrycode2',spark_type.StringType(),True),
    spark_type.StructField('countrycode3',spark_type.StringType(),True),
    spark_type.StructField('stateProvince',spark_type.StringType(),True),
    spark_type.StructField('latitude',spark_type.DoubleType(),True),
    spark_type.StructField('longitude',spark_type.DoubleType(),True),
  ]),True),
  spark_type.StructField('isAnonymous',spark_type.BooleanType(),True),
  spark_type.StructField('isNewPage',spark_type.BooleanType(),True),
  spark_type.StructField('isRobot',spark_type.BooleanType(),True),
  spark_type.StructField('isUnpatrolled',spark_type.BooleanType(),True),
  spark_type.StructField('namespace',spark_type.StringType(),True),
  spark_type.StructField('page',spark_type.StringType(),True),
  spark_type.StructField('pageURL',spark_type.StringType(),True),
  spark_type.StructField('timestamp',spark_type.StringType(),True),
  spark_type.StructField('url',spark_type.StringType(),True),
  spark_type.StructField('user',spark_type.StringType(),True),
  spark_type.StructField('userURL',spark_type.StringType(),True),
  spark_type.StructField('wikipediaURL',spark_type.StringType(),True),
  spark_type.StructField('wikipedia',spark_type.StringType(),True)
])

In [0]:
new_df = df.select(spark_func.from_json('value',schema).alias('json'))

In [0]:
# getting the nested fields of json and some basic spark filtering
new_df2 = (
           new_df
  .select(
    spark_func.col('json.channel').alias('channel'),
    spark_func.col('json.comment').alias('comment'),
    spark_func.col('json.delta').alias('delta'),
    spark_func.col('json.flag').alias('flag'),
    spark_func.col('json.geocoding').alias('geocoding'),
    spark_func.col('json.isAnonymous').alias('isAnonymous'),
    spark_func.col('json.isNewPage').alias('isNewPage'),
    spark_func.col('json.isRobot').alias('isRobot'),
    spark_func.col('json.user').alias('user'),
    spark_func.col('json.wikipediaURL').alias('wikipediaURL'),
    spark_func.col('json.wikipedia').alias('wikipedia')
  
  )
  .filter(spark_func.col('json.namespace')=='article')
  .filter(spark_func.col('json.namespace').isNotNull())
          
          )

In [0]:
#getting the aws secret credentials
sfUrl = dbutils.secrets.get(scope = "SFUrl", key = "sfUrl")
sfPassword = dbutils.secrets.get(scope = "SFPass", key = "sfPassword")

credential_properties = {
  'sfUrl':sfUrl,
  'sfUser':'Rafay007',
  'sfPassword':sfPassword,
  'sfDatabase':'CITIBIKE_rafay',
  'sfSchema':'PUBLIC',
  'sfWarehouse':'COMPUTE_WH'
  
}

In [0]:
stream_name = 'KafkaStream'
table_name = 'kafka_wiki'

def writeToSnowflake(df,epochId):
  print(epochId)
  df.write.format('snowflake').options(**credential_properties).option('dbtable',table_name).mode('append').save()
  
if __name__=='__main__':
  
  StremProcessing = (
    new_df2.writeStream
    .queryName(stream_name)
    .trigger(processingTime='3 seconds')
    .foreachBatch(writeToSnowflake)
    .start()

  )