# Read CSV file

In [0]:
# File location and type
file_location = "/FileStore/tables/diabetes.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)



In [0]:
#show df
df.show()

+-------+-------------+-------------+-------+----+------------------------+---+-----+
|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|label|
+-------+-------------+-------------+-------+----+------------------------+---+-----+
|    148|           72|           35|      0|33.6|                   0.627| 50|    1|
|     85|           66|           29|      0|26.6|                   0.351| 31|    0|
|    183|           64|            0|      0|23.3|                   0.672| 32|    1|
|     89|           66|           23|     94|28.1|                   0.167| 21|    0|
|    137|           40|           35|    168|43.1|                   2.288| 33|    1|
|    116|           74|            0|      0|25.6|                   0.201| 30|    0|
|     78|           50|           32|     88|31.0|                   0.248| 26|    1|
|    115|            0|            0|      0|35.3|                   0.134| 29|    0|
|    197|           70|           45|    543|30.5|    

In [0]:
# number of rows in the dataframe
df.count()

Out[42]: 768

In [0]:
# check for Null values
from pyspark.sql.functions import col,isnan, when, count
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|          0|      0|            0|            0|      0|  0|                       0|  0|      0|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+



In [0]:
# filter df where the BMI > 0
df = df.filter(df.BMI > 0)

In [0]:
# change the column name "outcome" to "label"
df = df.withColumnRenamed('outcome', 'label')

In [0]:
#remove the 'Pregnancies' column
df = df.drop('Pregnancies')

In [0]:
#show the df again
df.show()

+-------+-------------+-------------+-------+----+------------------------+---+-----+
|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|label|
+-------+-------------+-------------+-------+----+------------------------+---+-----+
|    148|           72|           35|      0|33.6|                   0.627| 50|    1|
|     85|           66|           29|      0|26.6|                   0.351| 31|    0|
|    183|           64|            0|      0|23.3|                   0.672| 32|    1|
|     89|           66|           23|     94|28.1|                   0.167| 21|    0|
|    137|           40|           35|    168|43.1|                   2.288| 33|    1|
|    116|           74|            0|      0|25.6|                   0.201| 30|    0|
|     78|           50|           32|     88|31.0|                   0.248| 26|    1|
|    115|            0|            0|      0|35.3|                   0.134| 29|    0|
|    197|           70|           45|    543|30.5|    

In [0]:
# select 100 rows from the df
df = df.limit(10)

#Add the Mongo DB Connector as a Library
####1-Navigate to the cluster detail page and select the Libraries tab.
####2-Click the Install New button.
####3-Select Maven as the Library Source.
####4-Enter the Mongo DB Connector for Spark package value into the Coordinates field based on your Databricks Runtime version:
####5-For Databricks Runtime 7.0.0 and above, enter org.mongodb.spark:mongo-spark-connector_2.12:3.0.0.
####6-For Databricks Runtime 5.5 LTS and 6.x, enter org.mongodb.spark:mongo-spark-connector_2.11:2.3.4.
####7-Click Install.

#Configure Databricks Cluster with MongoDB Connection URI
####1-Get the MongoDB connection URI. In the MongoDB Atlas UI, click the cluster you created.
####2- Click the Connect button.
####3- Click Connect Your Application.
####4- Select python in the Driver dropdown and 6 or later in the version dropdown.
####Copy the generated connection string. It should look like mongodb+srv://\<user>:\<password>@\<cluster>.kisanfz.mongodb.net/?retryWrites=true&w=majority
####Configure the user, password, and cluster-name values.
####5- In the cluster detail page for your Databricks cluster, select the Configuration tab.
####Click the Edit button.
####Under Advanced Options, select the Spark configuration tab and update the Spark Config using the connection string you copied in the previous step:
####spark.mongodb.output.uri \<connection-string>
####spark.mongodb.input.uri \<connection-string>

In [0]:
#write the df to the test database and collection you created in mongoDB
df.write.format("mongo").option("database",'testR').option("collection","D").mode("append").save()