In [1]:
import pyspark

In [2]:
# In Databricks if we have a structured file(csv) we could use the file to dbfs databricks file systems and convert them to tables. 

#Approach 1:
## We are have upoaded the file to dbfs and converted them to tables using GUI
## The created table is named as mass_train

In [3]:
# using the sqlContext to read the read the table
df = sqlContext.sql("SELECT * from  mass_train")

In [4]:
df.show()

In [5]:
#Approach 2
## Read the file from the dbfs
#location
###/FileStore/tables/FileStore/tables/mass_case_description_train_set.csv
###/FileStore/tables/FileStore/tables/mass_case_description_test_set.csv

sparkDF = sqlContext.read.format("csv").load("/FileStore/tables//FileStore/tables/",header=True,inferSchema=True)

In [6]:
sparkDF.show()

In [7]:
## schema names
sparkDF.printSchema()

In [8]:
csvSchema = StructType([ StructField("patient_id", StringType(), True),
                         StructField("breast_density", IntegerType(), True),
                         StructField("left or right breast", StringType(), True),
                         StructField("image view", StringType(), True),
                         StructField("abnormality id", IntegerType(), True),
                         StructField("abnormality type", StringType(), True),
                         StructField("mass shape", StringType(), True),
                         StructField("mass margins", StringType(), True),
                         StructField("assessment", IntegerType(), True),
                         StructField("pathology", StringType(), True),
                         StructField("subtlety", IntegerType(), True),
                                     ])
# read the csv as stream (the stream transfer is fast as the input csv file has small size) using the read stream function
csv_sdf = spark.readStream.csv("/FileStore/tables//FileStore/tables/",schema=csvSchema)

In [9]:
#check whether the streamed dataset has the same schema type as defined
csv_sdf.schema == csvSchema

In [10]:
#The usual first step in attempting to process the data is to interactively query the data. Let's define a static DataFrame on the files, and give it a table name
#https://docs.databricks.com/spark/latest/structured-streaming/index.html#structured-streaming-python
## When streaming a data, make sure to remove the header as we define them in schema
###/FileStore/tables/data/mass_case_train_set.csv

# Here we are creating a static data frame to check and visualize the dataset
inpData ="/FileStore/tables/data/"

cSchema = StructType([ StructField("patient_id", StringType(), True),
                         StructField("breast_density", IntegerType(), True),
                         StructField("left or right breast", StringType(), True),
                         StructField("image view", StringType(), True),
                         StructField("abnormality id", IntegerType(), True),
                         StructField("abnormality type", StringType(), True),
                         StructField("mass shape", StringType(), True),
                         StructField("mass margins", StringType(), True),
                         StructField("assessment", IntegerType(), True),
                         StructField("pathology", StringType(), True),
                         StructField("subtlety", IntegerType(), True),
                                     ])

staticInputDF = (
  spark
    .read
    .schema(cSchema)
    .csv(inpData)
)

In [11]:
display(staticInputDF)

In [12]:
#only takeing the specific column from a dataset and storing in new view
from pyspark.sql.functions import *      # for window() function

staticCountsDF = (
  staticInputDF
    .groupBy(
       staticInputDF.breast_density, 
       window(staticInputDF.patient_id, "1 hour"))    
    .count()
)
staticCountsDF.cache()

# create temporary views
## Register the DataFrame as table 'static_counts'
staticCountsDF.createOrReplaceTempView("static_counts")

In [13]:
# Streaming of the data

from pyspark.sql.functions import *

# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(cSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .csv(inpData)
)

# Same query as staticInputDF
streamingCountsDF = (                 
  streamingInputDF
    .groupBy(
      streamingInputDF.breast_density, 
      window(streamingInputDF.patient_id, "1 hour"))
    .count()
)

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

In [14]:
display(streamingInputDF)

In [15]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

In [16]:
# spark data frame consists of both train and test data.
display(sparkDF)


In [17]:
sparkDF.count()

In [18]:
#logistics regression

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Logistic').getOrCreate()

from pyspark.ml.classification import LogisticRegression

In [19]:
lg_model = LogisticRegression()