# Using Python and Scala Together
# <img src="https://www.python.org/static/community_logos/python-logo-generic.svg" width="350" /> &nbsp; + &nbsp; &nbsp; &nbsp; <img src="https://www.scala-lang.org/resources/img/scala-logo-red-spiral-dark.png" />

Databricks supports four different programming languages when writing notebooks:  Python, R, Scala, and SQL.  You can even [mix in different languages](https://docs.databricks.com/notebooks/notebooks-use.html#mix-languages) within
the same notebook.  This turns out to be a very powerful feature because data scientists who are comfortable using Python sometimes need to use custom Spark libraries, many of which are written in Scala.

In this notebook, we will show how a data scientist can use Python and the PySpark API to prepare some data.  We will show how the data from the Python environment can be shared with the Scala environment and how a custom
Scala library can be used to operate on that data. Finally, we will see that Python can be used to continue the data science process by picking up the data created by the Scala process.

For this example, we will be using the [Smart Imputation library](https://github.com/JMailloH/Smart_Imputation), which allows us to harness the power of Spark to do KNN imputation of missing values on a large dataset distributed
across a cluster.

# Step 0: Install the Library

To use this example notebook, you will need to install the "Smart Imputation" library in your cluster.  Open the "Clusters" admin page in Databricks and select your cluster.  On the "Libraries" tab, click the "Install New" button.  The library source will be Maven. You only need to supply the following coordinates:  `JMailloH:Smart_Imputation:1.0`

## Step 1:  Prepare Data in Python
For this example, we will need some data with missing values that we can impute.  We begin by using Python to create a Spark dataframe.  For this example, we will use the flight delay sample dataset that is included with Databricks.

In [0]:
df = (spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .csv("/databricks-datasets/flights/departuredelays.csv")
        .cache()
     )

The dataset has a couple of string factors.  Let's index those into numeric features... like we would do in a typical machine learning project

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

string_columns = [dt[0] for dt in df.dtypes if dt[1] == 'string']

indexers = [StringIndexer(inputCol=c, outputCol=c+"_index").fit(df) for c in string_columns ]

pipeline = Pipeline(stages=indexers)
df_indexed = pipeline.fit(df).transform(df).drop(*string_columns)

Our example project is to impute missing values in our dataset.  Let's see how many values in the `delay` column are null...

In [0]:
import pyspark.sql.functions as pyf

df_indexed.withColumn("delay_is_null", pyf.col("delay").isNull()).groupBy("delay_is_null").count().show()
print("Total Rows: {:,}".format(df_indexed.count()))

As the query above shows, there are no missing values in this dataset.  That's no problem.  Let's just randomly select 20% of the rows and set the `delay` column to null for those rows.

In [0]:
splits = df_indexed.randomSplit([0.8, 0.2])
splits[1] = splits[1].withColumn("delay", pyf.lit(None))
df_indexed = splits[0].union(splits[1]).orderBy("origin_index", "date")

Re-run the same query as above to see how many values in the `delay` column are null.

In [0]:
df_indexed.withColumn("delay_is_null", pyf.col("delay").isNull()).groupBy("delay_is_null").count().show()
print("Total Rows: {:,}".format(df_indexed.count()))

We now have a Spark dataframe referenced in Pyspark that contains the data we want to work with.  However, we need to use that same Spark dataframe in Scala.  To do this, we simply register our Spark dataframe as a "temp view" in Spark.
We will name the temp view as `flights` and then be able to access is it by that name in Scala (or any other language environment that supports Spark).

In [0]:
df_indexed.createOrReplaceTempView("flights")

## Step 2:  Read the Data in Scala

Now we can switch over to Scala.  To reference the same Spark dataframe we created with Python, we simply have to access our temp view by name... just as if it was a regular table.

Once we can access the Spark dataframe in Scala, we can continue processing it.  The "Smart Imputation" library that we will be using in this example expects RDD's containing all of the values in a single, comma-separated string.
It also expects the missing values to be represented by a question mark `?`.  We can perform those conversions easily with Scala.

In [0]:
%scala
val dfIndexed = spark.table("flights").cache()
val rdd_data = dfIndexed.map(row => row.mkString(",").replace("null", "?")).rdd

The "Smart Imputation" library also needs a header file that describes the dataset.  It expects this file to be in the [ARFF](https://www.cs.waikato.ac.nz/ml/weka/arff.html) format.  This little snippet of code will create the
file for us.

In [0]:
%scala

import org.apache.spark.sql.functions.{min, max}

var arffHeader = "@relation flights";

for ((c,d) <- dfIndexed.dtypes) {
  var attributeType = d match {
      case "IntegerType"  => "integer"
      case "DoubleType"  => "real"
      case _  => "Invalid Type"
  }
  
  var line = "\n@attribute " + c + " " + attributeType

  if (attributeType == "integer" || attributeType == "real") {
    // get the range of values in the column
    var range = dfIndexed.agg(min(c), max(c)).head
    var rangeText = " [" + range(0) + ", " + range(1) + "]"
    line = line.concat(rangeText)
  }
  
  arffHeader = arffHeader.concat(line)
}

arffHeader = arffHeader.concat("\n@inputs " + dfIndexed.columns.filter(_ != "distance").mkString(", "))
arffHeader = arffHeader.concat("\n@outputs distance")
arffHeader = arffHeader.concat("\n@data")

dbutils.fs.put("/knn/data.header", arffHeader, true)

Now we have everything we need to call the "Smart Imputation" library.  So let's kick it off!  The process can take a while to run.

In [0]:
%scala

import org.apache.spark.mllib.preprocessing.kNNI_IS.KNNI_IS

val K = 3
val distanceType = 2       // MANHATTAN = 1 ; EUCLIDEAN = 2 ; HVDM = 3
val pathHeader = "/dbfs/knn/data.header"
val numPartitionMap = 10
val version = "global"     // "global" or "local"

val knni = KNNI_IS.setup(rdd_data, K, distanceType, pathHeader, numPartitionMap, version)
val imputedData = knni.imputation(sc)

The "Smart Imputation" library returns an RDDs of arrays of strings.  Let's convert the RDD's to a dataframe, extract the array elements to individual columns, and cast the values back to their original data types.
(We also save the imputed data to a parquet file.  We don't have to do this... I just did it because the imputation process took so long that I didn't want to re-run it if I needed this data again.)

In [0]:
%scala

import org.apache.spark.sql.functions._

var imputedDF = imputedData.toDF()
var i = 1

for ((c,d) <- dfIndexed.dtypes) {
  imputedDF = imputedDF.withColumn(c, element_at($"value", i).cast(d.replaceAll("Type$", "")))
  i += 1
}

imputedDF = imputedDF.drop($"value")

imputedDF.write.mode("overwrite").parquet("/knn/output")

One last step in Scala.  We need to register our Spark dataframe containing the imputed data as a temp view.  This will allow us to pick up this new dataframe in Python.

In [0]:
%scala
imputedDF.createOrReplaceTempView("imputed")

## Step 3:  Back to Python

Now we can return to Python and continue processing the imputed data.  We can access the Spark dataframe containing the imputed data simply by referencing the name of the temp view we registered.

In [0]:
imputed_df = spark.table("imputed")

imputed_df.withColumn("delay_is_null", pyf.col("delay").isNull()).groupBy("delay_is_null").count().show()
print("Total Rows: {:,}".format(imputed_df.count()))

From here on out... we can do just some regular analysis with Python, just like we normally would.

In [0]:
display(df_indexed)

date,delay,distance,origin_index,destination_index
1010600,-3.0,599,0.0,5.0
1010615,-3.0,513,0.0,21.0
1010630,-5.0,351,0.0,10.0
1010630,-1.0,788,0.0,18.0
1010640,-4.0,517,0.0,19.0
1010700,-2.0,636,0.0,1.0
1010700,1.0,197,0.0,9.0
1010700,9.0,501,0.0,20.0
1010700,18.0,662,0.0,12.0
1010707,2.0,527,0.0,2.0


In [0]:
x_df = (
  df_indexed.alias("o")
    .join(imputed_df.alias("i"),
          (df_indexed["date"] == imputed_df["date"]) &
          (df_indexed["distance"] == imputed_df["distance"]) &
          (df_indexed["origin_index"] == imputed_df["origin_index"]) &
          (df_indexed["destination_index"] == imputed_df["destination_index"])
         )
)

x_df.count()

In [0]:
display(x_df.filter(pyf.col("o.delay").isNull()))

date,delay,distance,origin_index,destination_index,date.1,delay.1,distance.1,origin_index.1,destination_index.1
1010859,,1585,5.0,30.0,1010859,1642,1585,5.0,30.0
1011059,,139,11.0,72.0,1011059,1642,139,11.0,72.0
1011500,,207,0.0,239.0,1011500,1642,207,0.0,239.0
1011505,,660,0.0,15.0,1011505,1642,660,0.0,15.0
1011550,,825,10.0,12.0,1011550,1642,825,10.0,12.0
1020820,,1192,8.0,31.0,1020820,1642,1192,8.0,31.0
1020829,,64,5.0,166.0,1020829,1642,64,5.0,166.0
1020835,,429,6.0,76.0,1020835,1642,429,6.0,76.0
1020840,,751,2.0,52.0,1020840,1642,751,2.0,52.0
1021220,,336,8.0,40.0,1021220,1642,336,8.0,40.0


In [0]:
imputed_df.groupBy("delay").count().orderBy(pyf.desc("delay")).show()