# 1. Extract data from tables stores in MariaDB

In [1]:
# Import mysql.connect module 
# Using Python to extract data from tables stored in MariaDB
import mysql.connector as mariadb
mariadb = mariadb.connect(user = "root", password = "", database = "cdw_sapp")
print('Connected to MariaDB database ...')
cur = mariadb.cursor()

Connected to MariaDB database ...


# 2. MariaDB → Spark SQL → MongoDB 
(1) Using Python to extract data from tables stored in MariaDB. 
(2) Transforming the data extracted from MariaDB using Spark (SparkSQL). 
(3) Loading the transformed data using Spark into MongoDB.

Part 1. Branch

In [2]:
# import pyspark.sql module
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# select the database from mariadb into spark 
# creating the dataframe using spark and the table name is cdw_sapp_creditcard
df1 = spark.read.format("jdbc").options(
    url = "jdbc:mysql://localhost:3306/cdw_sapp",
    driver = "com.mysql.cj.jdbc.Driver",
    dbtable = "cdw_sapp_branch",
    user = "root",
    password="").load()

# creating the temp view for PySpark SQL and named table name called credit card
df1.createOrReplaceTempView('branch')

# Using spark.sql command to do transformation jobs 
# Note: df2 is the variable for credit card

# Using spark.sql command to do transformation jobs 
# Note: df1 is the variable for branch
df1 = spark.sql("SELECT BRANCH_CODE, BRANCH_NAME, BRANCH_STREET, BRANCH_CITY, BRANCH_STATE, IFNULL(branch_zip, 99999) BRANCH_ZIP, CONCAT('(',SUBSTR(branch_phone,1,3),') ',SUBSTR(branch_phone,4,3),'-', SUBSTR(branch_phone,7)) AS BRANCH_PHONE, LAST_UPDATED FROM branch")
df1.show()

+-----------+------------+-------------------+-----------------+------------+----------+--------------+-------------------+
|BRANCH_CODE| BRANCH_NAME|      BRANCH_STREET|      BRANCH_CITY|BRANCH_STATE|BRANCH_ZIP|  BRANCH_PHONE|       LAST_UPDATED|
+-----------+------------+-------------------+-----------------+------------+----------+--------------+-------------------+
|          1|Example Bank|       Bridle Court|        Lakeville|          MN|     55044|(123) 456-5276|2018-04-18 16:51:47|
|          2|Example Bank|  Washington Street|          Huntley|          IL|     60142|(123) 461-8993|2018-04-18 16:51:47|
|          3|Example Bank|      Warren Street|SouthRichmondHill|          NY|     11419|(123) 498-5926|2018-04-18 16:51:47|
|          4|Example Bank|   Cleveland Street|       Middleburg|          FL|     32068|(123) 466-3064|2018-04-18 16:51:47|
|          5|Example Bank|        14th Street|    KingOfPrussia|          PA|     19406|(123) 484-9701|2018-04-18 16:51:47|
|       

In [3]:
# Connecting to MongoDB and the collection name called cdw_sapp_branch database name called cdw_sapp
uri = "mongodb://127.0.0.1/cdw_sapp.dbs"
spark_mongodb = SparkSession.builder.config("spark.mongodb.input.uri",uri).config("spark.mongodb.output.uri",uri).getOrCreate()
df1.write.format("com.mongodb.spark.sql.DefaultSource").mode('append').option('database','cdw_sapp').option('collection','cdw_sapp_branch').save()
df1.show()
# The result should be 115 documents

+-----------+------------+-------------------+-----------------+------------+----------+--------------+-------------------+
|BRANCH_CODE| BRANCH_NAME|      BRANCH_STREET|      BRANCH_CITY|BRANCH_STATE|BRANCH_ZIP|  BRANCH_PHONE|       LAST_UPDATED|
+-----------+------------+-------------------+-----------------+------------+----------+--------------+-------------------+
|          1|Example Bank|       Bridle Court|        Lakeville|          MN|     55044|(123) 456-5276|2018-04-18 16:51:47|
|          2|Example Bank|  Washington Street|          Huntley|          IL|     60142|(123) 461-8993|2018-04-18 16:51:47|
|          3|Example Bank|      Warren Street|SouthRichmondHill|          NY|     11419|(123) 498-5926|2018-04-18 16:51:47|
|          4|Example Bank|   Cleveland Street|       Middleburg|          FL|     32068|(123) 466-3064|2018-04-18 16:51:47|
|          5|Example Bank|        14th Street|    KingOfPrussia|          PA|     19406|(123) 484-9701|2018-04-18 16:51:47|
|       

Part 2. Credit Card

In [4]:
# import pyspark.sql module
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# select the database from mariadb into spark 
# creating the dataframe using spark and the table name is cdw_sapp_creditcard
df2 = spark.read.format("jdbc").options(
    url = "jdbc:mysql://localhost:3306/cdw_sapp",
    driver = "com.mysql.cj.jdbc.Driver",
    dbtable = "cdw_sapp_creditcard",
    user = "root",
    password="").load()

# creating the temp view for PySpark SQL and named table name called credit card
df2.createOrReplaceTempView('creditcard')

# Using spark.sql command to do transformation jobs 
# Note: df2 is the variable for credit card
df2 = spark.sql('SELECT CREDIT_CARD_NO CUST_CC_NO, CONCAT(YEAR, LPAD(Month, 2, 0), LPAD(Day, 2, 0)) TIMEID, CUST_SSN, BRANCH_CODE, TRANSACTION_TYPE, TRANSACTION_VALUE, TRANSACTION_ID FROM creditcard')
df2.show()

+----------------+--------+---------+-----------+----------------+-----------------+--------------+
|      CUST_CC_NO|  TIMEID| CUST_SSN|BRANCH_CODE|TRANSACTION_TYPE|TRANSACTION_VALUE|TRANSACTION_ID|
+----------------+--------+---------+-----------+----------------+-----------------+--------------+
|4210653349028689|20180214|123459988|        114|       Education|           78.900|             1|
|4210653349028689|20180320|123459988|         35|   Entertainment|           14.240|             2|
|4210653349028689|20180708|123459988|        160|         Grocery|           56.700|             3|
|4210653349028689|20180419|123459988|        114|   Entertainment|           59.730|             4|
|4210653349028689|20181010|123459988|         93|             Gas|            3.590|             5|
|4210653349028689|20180528|123459988|        164|       Education|            6.890|             6|
|4210653349028689|20180519|123459988|        119|   Entertainment|           43.390|             7|


In [5]:
# Connecting to MongoDB and the collection name called cdw_sapp_creditcard database name called cdw_sapp
uri = "mongodb://127.0.0.1/cdw_sapp.dbs"
spark_mongodb = SparkSession.builder.config("spark.mongodb.input.uri",uri).config("spark.mongodb.output.uri",uri).getOrCreate()
df2.write.format("com.mongodb.spark.sql.DefaultSource").mode('append').option('database','cdw_sapp').option('collection','cdw_sapp_creditcard').save()
df2.show()
# The result should be 46694 documents

+----------------+--------+---------+-----------+----------------+-----------------+--------------+
|      CUST_CC_NO|  TIMEID| CUST_SSN|BRANCH_CODE|TRANSACTION_TYPE|TRANSACTION_VALUE|TRANSACTION_ID|
+----------------+--------+---------+-----------+----------------+-----------------+--------------+
|4210653349028689|20180214|123459988|        114|       Education|           78.900|             1|
|4210653349028689|20180320|123459988|         35|   Entertainment|           14.240|             2|
|4210653349028689|20180708|123459988|        160|         Grocery|           56.700|             3|
|4210653349028689|20180419|123459988|        114|   Entertainment|           59.730|             4|
|4210653349028689|20181010|123459988|         93|             Gas|            3.590|             5|
|4210653349028689|20180528|123459988|        164|       Education|            6.890|             6|
|4210653349028689|20180519|123459988|        119|   Entertainment|           43.390|             7|


Part 3. Customer

In [6]:
# import pyspark.sql module
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# select the database from mariadb into spark 
# creating the dataframe using spark and the table name is cdw_sapp_customer
df3 = spark.read.format("jdbc").options(
    url = "jdbc:mysql://localhost:3306/cdw_sapp",
    driver = "com.mysql.cj.jdbc.Driver",
    dbtable = "cdw_sapp_customer",
    user = "root",
    password="").load()

# creating the temp view for PySpark SQL and named table name called credit card
df3.createOrReplaceTempView('customer')

# Using spark.sql command to do transformation jobs 
# Note: df3 is the variable for customer
df3 = spark.sql("SELECT SSN CUST_SSN, \
       CONCAT(UCASE(SUBSTRING(`FIRST_NAME`, 1, 1)), LOWER(SUBSTRING(`FIRST_NAME`, 2))) AS CUST_F_NAME, \
       CONCAT(LCASE(SUBSTRING(`MIDDLE_NAME`, 1, 1)), LOWER(SUBSTRING(`MIDDLE_NAME`, 2))) AS CUST_M_NAME, \
       CONCAT(UCASE(SUBSTRING(`LAST_NAME`, 1, 1)), LOWER(SUBSTRING(`LAST_NAME`, 2))) AS CUST_L_NAME, \
       CREDIT_CARD_NO CUST_CC_NO, CONCAT(APT_NO, ' ', STREET_NAME) AS CUST_STREET, \
       CUST_CITY, \
       CUST_STATE, \
       CUST_COUNTRY, \
       CUST_ZIP, \
       CONCAT(SUBSTRING(cust_phone,1,2), '-', SUBSTRING(cust_phone,3,2), '-', SUBSTRING(cust_phone,5,7)) AS CUST_PHONE, \
       CUST_EMAIL, \
       LAST_UPDATED \
       FROM customer")
df3.show()

+---------+-----------+-----------+-----------+----------------+--------------------+------------+----------+-------------+--------+----------+--------------------+-------------------+
| CUST_SSN|CUST_F_NAME|CUST_M_NAME|CUST_L_NAME|      CUST_CC_NO|         CUST_STREET|   CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|CUST_PHONE|          CUST_EMAIL|       LAST_UPDATED|
+---------+-----------+-----------+-----------+----------------+--------------------+------------+----------+-------------+--------+----------+--------------------+-------------------+
|123456100|       Alec|         wm|     Hooper|4210653310061055|656 Main Street N...|     Natchez|        MS|United States|   39120| 12-37-818| AHooper@example.com|2018-04-21 12:49:02|
|123453023|       Etta|    brendan|     Holman|4210653310102868|   829 Redwood Drive|Wethersfield|        CT|United States|   06109| 12-38-933| EHolman@example.com|2018-04-21 12:49:02|
|123454487|     Wilber|   ezequiel|     Dunham|4210653310116272|683 12th St

In [7]:
# Connecting to MongoDB and the collection name called cdw_sapp_customer database name called cdw_sapp
uri = "mongodb://127.0.0.1/cdw_sapp.dbs"
spark_mongodb = SparkSession.builder.config("spark.mongodb.input.uri",uri).config("spark.mongodb.output.uri",uri).getOrCreate()
df3.write.format("com.mongodb.spark.sql.DefaultSource").mode('append').option('database','cdw_sapp').option('collection','cdw_sapp_customer').save()
df3.show()
# The result should be 952 documents

+---------+-----------+-----------+-----------+----------------+--------------------+------------+----------+-------------+--------+----------+--------------------+-------------------+
| CUST_SSN|CUST_F_NAME|CUST_M_NAME|CUST_L_NAME|      CUST_CC_NO|         CUST_STREET|   CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|CUST_PHONE|          CUST_EMAIL|       LAST_UPDATED|
+---------+-----------+-----------+-----------+----------------+--------------------+------------+----------+-------------+--------+----------+--------------------+-------------------+
|123456100|       Alec|         wm|     Hooper|4210653310061055|656 Main Street N...|     Natchez|        MS|United States|   39120| 12-37-818| AHooper@example.com|2018-04-21 12:49:02|
|123453023|       Etta|    brendan|     Holman|4210653310102868|   829 Redwood Drive|Wethersfield|        CT|United States|   06109| 12-38-933| EHolman@example.com|2018-04-21 12:49:02|
|123454487|     Wilber|   ezequiel|     Dunham|4210653310116272|683 12th St

# 3. URL or Document to Kafka

(1) Extracting JSON, CSV and other types of data from web URLs using the Spark Streaming integration for Kafka, a stream-processing software. 

(2)  Using the Kafka connector in the Spark Structured Streaming in order to manage the transformation of the data before loading into MongoDB collections. 

(3) And Sending it to MongoDB

Part 1. The Database Name in this code is called health_insurance and collection name is health_insurance_insurancecharges

In [None]:
# This will get the data from the following url: 
# https://raw.githubusercontent.com/platformps/Healthcare-Insurance-Data/master/insurance.txt
import requests, os
from kafka import KafkaProducer
from pyspark.sql import SparkSession, Row

def kafka_prod():
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    response = requests.get("https://raw.githubusercontent.com/platformps/Healthcare-Insurance-Data/master/insurance.txt")
 
    data_list = [data for data in response.text.splitlines()[1:]]
    for data in data_list:
        #print(data)
        producer.send('insurance', data.encode('utf-8'))
    producer.flush()


def spark_kafka():
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1 pyspark-shell'
    # conf=SparkConf()
    # conf.set("spark.executor.memory", "4g")
    # conf.set("spark.driver.memory", "4g")
    spark = SparkSession.builder.getOrCreate()
    
    raw_kafka_df = spark.readStream \
                        .format("kafka") \
                        .option("kafka.bootstrap.servers", "localhost:9092") \
                        .option("subscribe", 'insurance') \
                        .option("startingOffsets", "earliest") \
                        .load()

    kafka_value_df = raw_kafka_df.selectExpr("CAST(value AS STRING)")

    output_query = kafka_value_df.writeStream \
                          .queryName("insurance") \
                          .format("memory") \
                          .start()
    output_query.awaitTermination(10)

    value_df = spark.sql("select * from insurance")
    value_df.show()
    
    value_rdd = value_df.rdd.map(lambda i: i['value'].split("\t"))
    value_row_rdd = value_rdd.map(lambda i: Row(age=i[0], \
                                                sex=i[1], \
                                                bmi=float(i[2]), \
                                                children=int(i[3]), \
                                                smoker=i[4], \
                                                region=i[5], \
                                             charges=i[6]))

    df = spark.createDataFrame(value_row_rdd)
    df.show()


    
    # df.printSchema()
    
    df.write.format("com.mongodb.spark.sql.DefaultSource") \
         .mode('append') \
         .option('database','health_insurance') \
         .option('collection', 'health_insurance_insurancecharges') \
         .option('uri', "mongodb://127.0.0.1/health_insurance.dbs") \
         .save()

def main():
    kafka_prod()
    spark_kafka()
#     
main()

Part 2. The Database Name in this code is called health_insurance and collection name is health_insurance_benefit, note: since the text file is very large, hence we break it in 4 parts.

In [None]:
# This will get the data from the following url: 
# https://raw.githubusercontent.com/platformps/Healthcare-Insurance-Data/master/BenefitsCostSharing_partOne.txt
import requests, os
from kafka import KafkaProducer
from pyspark.sql import SparkSession, Row
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

def kafka_prod():
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    response = requests.get("https://raw.githubusercontent.com/platformps/Healthcare-Insurance-Data/master/BenefitsCostSharing_partOne.txt")
 
    data_list = [data for data in response.text.splitlines()[1:]]
    for data in data_list:
        #print(data)
        producer.send('benefit1', data.encode('utf-8'))
    producer.flush()


def spark_kafka():
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1 pyspark-shell'
    # conf=SparkConf()
    # conf.set("spark.executor.memory", "4g")
    # conf.set("spark.driver.memory", "4g")
    spark = SparkSession.builder.getOrCreate()
    
    raw_kafka_df = spark.readStream \
                        .format("kafka") \
                        .option("kafka.bootstrap.servers", "localhost:9092") \
                        .option("subscribe", 'benefit1') \
                        .option("startingOffsets", "earliest") \
                        .load()

    kafka_value_df = raw_kafka_df.selectExpr("CAST(value AS STRING)")

    output_query = kafka_value_df.writeStream \
                          .queryName("benefit1") \
                          .format("memory") \
                          .start()
    output_query.awaitTermination(10)

    value_df = spark.sql("select * from benefit1")
    value_df.show()
    
    value_rdd = value_df.rdd.map(lambda i: i['value'].split("\t"))
    value_row_rdd = value_rdd.map(lambda i: Row(BenefitName=i[0], \
                                                BusinessYear=i[1], \
                                                EHBVarReason=i[2], \
                                                IsCovered=i[3], \
                                                IssuerId=i[4], \
                                                LimitQty=i[5], \
                                                LimitUnit=i[6], \
                                                MinimumStay=i[7], \
                                                PlanId=i[8], \
                                                SourceName=i[9], \
                                                StateCode=i[10]))

    df = spark.createDataFrame(value_row_rdd)
    df.show()


    
    # df.printSchema()
    
    df.write.format("com.mongodb.spark.sql.DefaultSource") \
         .mode('append') \
         .option('database','health_insurance') \
         .option('collection', 'health_insurance_benefit') \
         .option('uri', "mongodb://127.0.0.1/health_insurance.dbs") \
         .save()

def main():
    kafka_prod()
    spark_kafka()
#     
main()

In [None]:
# This will get the data from the following url: 
# https://raw.githubusercontent.com/platformps/Healthcare-Insurance-Data/master/BenefitsCostSharing_partTwo.txt
import requests, os
from kafka import KafkaProducer
from pyspark.sql import SparkSession, Row
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

def kafka_prod():
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    response = requests.get("https://raw.githubusercontent.com/platformps/Healthcare-Insurance-Data/master/BenefitsCostSharing_partTwo.txt")
 
    data_list = [data for data in response.text.splitlines()[1:]]
    for data in data_list:
        #print(data)
        producer.send('benefit2', data.encode('utf-8'))
    producer.flush()


def spark_kafka():
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1 pyspark-shell'
    # conf=SparkConf()
    # conf.set("spark.executor.memory", "4g")
    # conf.set("spark.driver.memory", "4g")
    spark = SparkSession.builder.getOrCreate()
    
    raw_kafka_df = spark.readStream \
                        .format("kafka") \
                        .option("kafka.bootstrap.servers", "localhost:9092") \
                        .option("subscribe", 'benefit2') \
                        .option("startingOffsets", "earliest") \
                        .load()

    kafka_value_df = raw_kafka_df.selectExpr("CAST(value AS STRING)")

    output_query = kafka_value_df.writeStream \
                          .queryName("benefit2") \
                          .format("memory") \
                          .start()
    output_query.awaitTermination(10)

    value_df = spark.sql("select * from benefit2")
    value_df.show()
    
    value_rdd = value_df.rdd.map(lambda i: i['value'].split("\t"))
    value_row_rdd = value_rdd.map(lambda i: Row(BenefitName=i[0], \
                                                BusinessYear=i[1], \
                                                EHBVarReason=i[2], \
                                                IsCovered=i[3], \
                                                IssuerId=i[4], \
                                                LimitQty=i[5], \
                                                LimitUnit=i[6], \
                                                MinimumStay=i[7], \
                                                PlanId=i[8], \
                                                SourceName=i[9], \
                                                StateCode=i[10]))

    df = spark.createDataFrame(value_row_rdd)
    df.show()


    
    # df.printSchema()
    
    df.write.format("com.mongodb.spark.sql.DefaultSource") \
         .mode('append') \
         .option('database','health_insurance') \
         .option('collection', 'health_insurance_benefit') \
         .option('uri', "mongodb://127.0.0.1/health_insurance.dbs") \
         .save()

def main():
    kafka_prod()
    spark_kafka()
#     
main()

In [None]:
# This will get the data from the following url: 
# https://raw.githubusercontent.com/platformps/Healthcare-Insurance-Data/master/BenefitsCostSharing_partThree.txt
import requests, os
from kafka import KafkaProducer
from pyspark.sql import SparkSession, Row
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

def kafka_prod():
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    response = requests.get("https://raw.githubusercontent.com/platformps/Healthcare-Insurance-Data/master/BenefitsCostSharing_partThree.txt")
 
    data_list = [data for data in response.text.splitlines()[1:]]
    for data in data_list:
        #print(data)
        producer.send('benefit3', data.encode('utf-8'))
    producer.flush()


def spark_kafka():
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1 pyspark-shell'
    # conf=SparkConf()
    # conf.set("spark.executor.memory", "4g")
    # conf.set("spark.driver.memory", "4g")
    spark = SparkSession.builder.getOrCreate()
    
    raw_kafka_df = spark.readStream \
                        .format("kafka") \
                        .option("kafka.bootstrap.servers", "localhost:9092") \
                        .option("subscribe", 'benefit3') \
                        .option("startingOffsets", "earliest") \
                        .load()

    kafka_value_df = raw_kafka_df.selectExpr("CAST(value AS STRING)")

    output_query = kafka_value_df.writeStream \
                          .queryName("benefit3") \
                          .format("memory") \
                          .start()
    output_query.awaitTermination(10)

    value_df = spark.sql("select * from benefit3")
    value_df.show()
    
    value_rdd = value_df.rdd.map(lambda i: i['value'].split("\t"))
    value_row_rdd = value_rdd.map(lambda i: Row(BenefitName=i[0], \
                                                BusinessYear=i[1], \
                                                EHBVarReason=i[2], \
                                                IsCovered=i[3], \
                                                IssuerId=i[4], \
                                                LimitQty=i[5], \
                                                LimitUnit=i[6], \
                                                MinimumStay=i[7], \
                                                PlanId=i[8], \
                                                SourceName=i[9], \
                                                StateCode=i[10]))

    df = spark.createDataFrame(value_row_rdd)
    df.show()


    
    # df.printSchema()
    
    df.write.format("com.mongodb.spark.sql.DefaultSource") \
         .mode('append') \
         .option('database','health_insurance') \
         .option('collection', 'health_insurance_benefit') \
         .option('uri', "mongodb://127.0.0.1/health_insurance.dbs") \
         .save()

def main():
    kafka_prod()
    spark_kafka()
#     
main()


In [None]:
# This will get the data from the following url: 
# https://raw.githubusercontent.com/platformps/Healthcare-Insurance-Data/master/BenefitsCostSharing_partFour.txt
import requests, os
from kafka import KafkaProducer
from pyspark.sql import SparkSession, Row
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

def kafka_prod():
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    response = requests.get("https://raw.githubusercontent.com/platformps/Healthcare-Insurance-Data/master/BenefitsCostSharing_partFour.txt")
 
    data_list = [data for data in response.text.splitlines()[1:]]
    for data in data_list:
        #print(data)
        producer.send('benefitcost4', data.encode('utf-8'))
    producer.flush()


def spark_kafka():
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1 pyspark-shell'
    # conf=SparkConf()
    # conf.set("spark.executor.memory", "4g")
    # conf.set("spark.driver.memory", "4g")
    spark = SparkSession.builder.getOrCreate()
    
    raw_kafka_df = spark.readStream \
                        .format("kafka") \
                        .option("kafka.bootstrap.servers", "localhost:9092") \
                        .option("subscribe", 'benefitcost4') \
                        .option("startingOffsets", "earliest") \
                        .load()

    kafka_value_df = raw_kafka_df.selectExpr("CAST(value AS STRING)")

    output_query = kafka_value_df.writeStream \
                          .queryName("benefitcost4") \
                          .format("memory") \
                          .start()
    output_query.awaitTermination(10)

    value_df = spark.sql("select * from benefitcost4")
    value_df.show()
    
    value_rdd = value_df.rdd.map(lambda i: i['value'].split("\t"))
    value_row_rdd = value_rdd.map(lambda i: Row(BenefitName=i[0], \
                                                BusinessYear=i[1], \
                                                EHBVarReason=i[2], \
                                                IsCovered=i[3], \
                                                IssuerId=i[4], \
                                                LimitQty=i[5], \
                                                LimitUnit=i[6], \
                                                MinimumStay=i[7], \
                                                PlanId=i[8], \
                                                SourceName=i[9], \
                                                StateCode=i[10]))

    df = spark.createDataFrame(value_row_rdd)
    df.show()


    
    # df.printSchema()
    
    df.write.format("com.mongodb.spark.sql.DefaultSource") \
         .mode('append') \
         .option('database','health_insurance') \
         .option('collection', 'health_insurance_benefit') \
         .option('uri', "mongodb://127.0.0.1/health_insurance.dbs") \
         .save()

def main():
    kafka_prod()
    spark_kafka()
#     
main()


Part 3. The Database Name in this code is called health_insurance and collection name is health_insurance_service

In [None]:
# This will get the data from the following url: 
# https://raw.githubusercontent.com/platformps/Healthcare-Insurance-Data/master/ServiceArea.csv
import requests, os
from kafka import KafkaProducer
from pyspark.sql import SparkSession, Row

def kafka_prod():
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    response = requests.get("https://raw.githubusercontent.com/platformps/Healthcare-Insurance-Data/master/ServiceArea.csv")
 
    data_list = [data for data in response.text.splitlines()[1:]]
    for data in data_list:
        #print(data)
        producer.send('service2', data.encode('utf-8'))
    producer.flush()


def spark_kafka():
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1 pyspark-shell'
    # conf=SparkConf()
    # conf.set("spark.executor.memory", "4g")
    # conf.set("spark.driver.memory", "4g")
    spark = SparkSession.builder.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/").getOrCreate()
    
    raw_kafka_df = spark.readStream \
                        .format("kafka") \
                        .option("kafka.bootstrap.servers", "localhost:9092") \
                        .option("subscribe", 'service2') \
                        .option("startingOffsets", "earliest") \
                        .load()

    kafka_value_df = raw_kafka_df.selectExpr("CAST(value AS STRING)")

    output_query = kafka_value_df.writeStream \
                          .queryName("service2") \
                          .format("memory") \
                          .start()
    output_query.awaitTermination(10)

    value_df = spark.sql("select * from service2")
    value_df.show()
    
    value_rdd = value_df.rdd.map(lambda i: i['value'].split(","))
    value_row_rdd = value_rdd.map(lambda i: Row(BusinessYear=int(i[0]), \
                                                StateCode=i[1], \
                                                IssuerId=int(i[2]), \
                                                SourceName=i[3], \
                                                VersionNum=int(i[4]), \
                                                ImportDate=i[5], \
                                                IssuerId2=int(i[6]), \
                                                StateCode2=i[7], \
                                                ServiceAreaId =i[8], \
                                                ServiceAreaName = i[9], \
                                                CoverEntireState=i[10], \
                                                County=i[11], \
                                                PartialCounty=i[12], \
                                                ZipCodes=i[13], \
                                                PartialCountyJustification=i[14], \
                                                RowNumber=i[15], \
                                                MarketCoverage=i[16], \
                                                DentalOnlyPlan=i[17]))

    df = spark.createDataFrame(value_row_rdd)
    df.show()


    
    # df.printSchema()
    
#     df.write.format("com.mongodb.spark.sql.DefaultSource") \
#         .mode('append') \
#         .option('database','cdw_sapp') \
#         .option('collection', 'cdw_sapp_service') \
#         .option('uri', "mongodb://127.0.0.1/cdw_sapp.dbs") \
#         .save()

    df.write.format("com.mongodb.spark.sql.DefaultSource") \
        .mode('append') \
        .option('database','health_insurance') \
        .option('collection', 'health_insurance_service') \
        .save()

def main():
    kafka_prod()
    spark_kafka()
#     
main()

Part 4. The Database Name in this code is called health_insurance and collection name is health_insurance_plan

In [None]:
# This will get the data from the following url: 
# https://raw.githubusercontent.com/platformps/Healthcare-Insurance-Data/master/PlanAttributes.csv
import requests, os
from kafka import KafkaProducer
from pyspark.sql import SparkSession, Row
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

def kafka_prod():
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    response = requests.get("https://raw.githubusercontent.com/platformps/Healthcare-Insurance-Data/master/PlanAttributes.csv")
 
    data_list = [data for data in response.text.splitlines()[1:]]
    for data in data_list:
        #print(data)
        producer.send('plans', data.encode('utf-8'))
    producer.flush()


def spark_kafka():
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1 pyspark-shell'
    # conf=SparkConf()
    # conf.set("spark.executor.memory", "4g")
    # conf.set("spark.driver.memory", "4g")
    spark = SparkSession.builder.getOrCreate()
    
    raw_kafka_df = spark.readStream \
                        .format("kafka") \
                        .option("kafka.bootstrap.servers", "localhost:9092") \
                        .option("subscribe", 'plans') \
                        .option("startingOffsets", "earliest") \
                        .load()

    kafka_value_df = raw_kafka_df.selectExpr("CAST(value AS STRING)")

    output_query = kafka_value_df.writeStream \
                          .queryName("plans") \
                          .format("memory") \
                          .start()
    output_query.awaitTermination(10)

    value_df = spark.sql("select * from plans")
    value_df.show()
    
    value_rdd = value_df.rdd.map(lambda i: i['value'].split("\t"))
    value_row_rdd = value_rdd.map(lambda i: Row(AttributesID=int(i[0]), \
                                                BeginPrimaryCareCostSharingAfterNumberOfVisits=int(i[1]), \
                                                bmi=float(i[2]), \
                                                children=int(i[3]), \
                                                smoker=i[4], \
                                                region=i[5], \
                                                charges=i[6]))

    df = spark.createDataFrame(value_row_rdd)
    df.show()


    
    # df.printSchema()
    
    df.write.format("com.mongodb.spark.sql.DefaultSource") \
         .mode('append') \
         .option('database','health_insurance') \
         .option('collection', 'health_insurance_plan') \
         .option('uri', "mongodb://127.0.0.1/health_insurance.dbs") \
         .save()

def main():
    kafka_prod()
    spark_kafka()
#     
main()


Part 5. The Database Name in this code is called health_insurance and collection name is health_insurance_network

In [None]:
# This will get the data from the following url: 
# https://raw.githubusercontent.com/platformps/Healthcare-Insurance-Data/master/Network.csv
import requests, os
from kafka import KafkaProducer
from pyspark.sql import SparkSession, Row

def kafka_prod():
    producer = KafkaProducer(bootstrap_servers='localhost:9092')
    response = requests.get("https://raw.githubusercontent.com/platformps/Healthcare-Insurance-Data/master/Network.csv")
 
    data_list = [data for data in response.text.splitlines()[1:]]
    for data in data_list:
        #print(data)
        producer.send('network1', data.encode('utf-8'))
    producer.flush()


def spark_kafka():
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1 pyspark-shell'
    # conf=SparkConf()
    # conf.set("spark.executor.memory", "4g")
    # conf.set("spark.driver.memory", "4g")
    spark = SparkSession.builder.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/").getOrCreate()
    
    raw_kafka_df = spark.readStream \
                        .format("kafka") \
                        .option("kafka.bootstrap.servers", "localhost:9092") \
                        .option("subscribe", 'network1') \
                        .option("startingOffsets", "earliest") \
                        .load()

    kafka_value_df = raw_kafka_df.selectExpr("CAST(value AS STRING)")

    output_query = kafka_value_df.writeStream \
                          .queryName("network1") \
                          .format("memory") \
                          .start()
    output_query.awaitTermination(10)

    value_df = spark.sql("select * from network1")
    value_df.show()
    
    value_rdd = value_df.rdd.map(lambda i: i["value"].split(","))
#     value_rdd.foreach(lambda i: print(len(i)))
    value_row_rdd = value_rdd.map(lambda i: Row(BusinessYear=i[0], \
                                                StateCode=i[1], \
                                                IssuerId=int(i[2]), \
                                                SourceName=i[3], \
                                                VersionNum=int(i[4]), \
                                                ImportDate=i[5], \
                                                IssuerId2=int(i[6]), \
                                                StateCode2=i[7], \
                                                NetworkName=i[8], \
                                                NetworkId=i[9], \
                                                NetworkURL=i[10], \
                                                RowNumber=i[11], \
                                                MarketCoverage=i[12], \
                                                DentalOnlyPlan=i[13]))

    df = spark.createDataFrame(value_row_rdd)
    df.show()


    
    # df.printSchema()
    
#     df.write.format("com.mongodb.spark.sql.DefaultSource") \
#         .mode('append') \
#         .option('database','cdw_sapp') \
#         .option('collection', 'cdw_sapp_service') \
#         .option('uri', "mongodb://127.0.0.1/cdw_sapp.dbs") \
#         .save()

    df.write.format("com.mongodb.spark.sql.DefaultSource") \
        .mode('append') \
        .option('database','health_insurance') \
        .option('collection', 'health_insurance_network') \
        .save()

def main():
    # kafka_prod()
    spark_kafka()
#     
main()


# 4. Data Visualization Questions

(A)​ ​Use “Service Area Dataset” from MongoDB. Find and plot the count of ​ServiceAreaName, SourceName , and BusinessYear ​across the country each state? 

In [None]:
# Import PyMongo to Connect the MongoClient and Find the ServiceAreaName, SourceName , and BusinessYear 
# across the country each state
import pymongo
from pymongo import MongoClient
import pandas as pd
import matplotlib.pyplot as plt
client = MongoClient('localhost',27017)
# print('Connect to the MongoDB Client')
db = client.health_insurance
collection = db.health_insurance_service
data = pd.DataFrame(list(collection.find()))
state_count_df = data.groupby('StateCode').count()
a = state_count_df[['ServiceAreaName','SourceName','BusinessYear']]
a.plot.bar(figsize=(10,10))
plt.show() 

(B)​ ​Use “Service Area Dataset” from MongoDB. Find and plot the count of “​sources​” across the country. 

In [None]:
# The following plot shows the sourcename for the country.
source_count_df = data.groupby('SourceName').count()
b = state_count_df[['SourceName','County']]
b.plot.bar(figsize=(10,10))
plt.show()

(C)​ ​Use the “Benefit Cost Sharing” dataset from MongoDB. Find and plot the number of benefit plans in each state.

In [None]:
# Import PyMongo to Connect the MongoClient and using Benefit Cost Sharing dataset.
import pymongo
from pymongo import MongoClient
import pandas as pd
import matplotlib.pyplot as plt
client = MongoClient('localhost',27017)
# print('Connect to the MongoDB Client')
d1 = db.health_insurance_benefit
d2 = pd.DataFrame(list(d1.find()))
d3 = d2[['BenefitName','StateCode']].groupby(['StateCode'])['BenefitName'].count()
d3.plot.bar(figsize = (10,10))
plt.show()
# The plot will show the benefit plans in each state, Wisconsin has the highest benefit plans in the country.

(D) ​ ​Use the “Insurance” dataset from MongoDB and find the number of mothers who smoke and also have children. 

In [None]:
# Import pymongo module and using insurance dataset
import pymongo
from pymongo import MongoClient
import pandas as pd
import matplotlib.pyplot as plt
client = MongoClient('localhost',27017)
# print('Connect to the MongoDB Client')
e1 = db.health_insurance_insurancecharges
e2 = pd.DataFrame(list(e1.find()))

In [None]:
e3=('The number of mother who smoke and also have children are ', e2[(e2['sex']=='female') & \
                                                                 (e2['smoker']=='yes') & \
                                                                 (e2['children']>0)]['_id'].count())
print(e3)
# The number of monther who smoke and also have children are 124 according to the dataset.


(E) ​Use the “Insurance” dataset from MongoDB. Find out which region has the highest rate of smokers. Plot the results for each region

In [None]:
# Import PyMongo to Connect the MongoClient and using insurance dataset to find out the region has the highest rate of smokers.
import pymongo
from pymongo import MongoClient
import pandas as pd
import matplotlib.pyplot as plt
client = MongoClient('localhost',27017)
print('Connect to the MongoDB Client')
f1 = db.health_insurance_insurancecharges
f2 = pd.DataFrame(list(f1.find()))
f3 = f2.groupby('region').count()['_id']
sm = f2[f2.smoker == 'yes'].groupby('region').count()['_id']
xyz = sm / f3
xyz.plot.bar(figsize=(10,10))
plt.show()
# The following graph shows that Southeastern region has the hightest rate more smokers.

# 5. User Menu

In [None]:
import branch
import creditcard
import customer
import Benefit1
import Benefit2
import Benefit3
import Benefit4
import Insurance
import PlansAttribute
import Network
import ServiceArea
import VA
import VB
import VD
import VE
import VF

def main(): 
    print('Hello!')
    entry = None
    while entry != '0':
        entry = input('\n1) If you would like to see credit card data please press 1\
                    \n2) If you would like to see Health Insurance data please press 2\
                    \n3) If you would like to visualize or analyze uploaded data please press 3\
                    \n4) Otherwise please press 0, and you will be logged out from the system\
                    \n\nPlease choose one of the following options 1, 2, 3, or 4:\
                    \n-------- ')
        if entry =='1':
            case = input('\n1) If you would like to see branch data please press 1\
                    \n2) If you would like to see credit card data please press 2\
                    \n3) If you would like to see customers data please press 3\
                    \n4) Otherwise please press 0, and you will return to the previous menu\
                    \n-------- ')
            if case =='1':
                from Case_Study import branch 
            elif case=='2':
                from Case_Study import creditcard
            elif case=='3':
                from Case_Study import customer
            else:
                print('You are returned to main menu')
        elif entry =='2':
            case = input('\nHealth Insurance Marketplace Data Files:\
                           \n\t1) BenefitsCostSharing\
                           \n\t2) Insurance\
                           \n\t3) PlanAttributes\
                           \n\t4) Network\
                           \n\t5) ServiceArea\
                           \n\t6) Otherwise please press g, and you will return to the previous menu\
                           \n\nPlease select anything from the list\
                           \n----------- ')
            if case == '1':
                from Case_Study import Benefit1, Benefit2, Benefit3, Benefit4
            elif case =='2':
                from Case_Study import Insurance
            elif case =='3':
                from Case_study import PlansAttribute   
            elif case =='4':
                from Case_Study import Network            
            elif case =='5':
                from Case_Study import ServiceArea
            else:
                print('You are returned to main menu')            
        elif entry =='3':
            case = input('nChoose on of the following options:\
            \n\t1) Here you can see counts of ServiceAreaName, SourceName, and BusinessYear by state\
                           \n\t2) Here you can see the counts of sources across the country\
                           \n\t3) Invalid option\
                           \n\t4) Here you can see the number of benefit plans in each state\
                           \n\t5) Here you can see quantity of smoking mother\
                           \n\t6) Here you can see highest smoking region\
                           \n\t7) Otherwise please press 0, and you will return to the previous menu\
                           \n\nPlease select an option from the list\
                           \n------- ')
            if case=='1':
                from Case_Study import VA
            elif case =='2':
                from Case_Study import VB
            elif case =='4':
                from Case_Study import VD         
            elif case =='5':
                from Case_Study import VE            
            elif case =='6':
                from Case_Studt import VF
            else:
                print('Main Menu')
    print('Closed')
                    
           
if __name__=='__main__':
    main()


# 6. Conclusion