## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
import pandas as pd
import numpy as np


# File location and type
file_location = "/FileStore/tables/groceries___groceries.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12,_c13,_c14,_c15,_c16,_c17,_c18,_c19,_c20,_c21,_c22,_c23,_c24,_c25,_c26,_c27,_c28,_c29,_c30,_c31,_c32
Item(s),Item 1,Item 2,Item 3,Item 4,Item 5,Item 6,Item 7,Item 8,Item 9,Item 10,Item 11,Item 12,Item 13,Item 14,Item 15,Item 16,Item 17,Item 18,Item 19,Item 20,Item 21,Item 22,Item 23,Item 24,Item 25,Item 26,Item 27,Item 28,Item 29,Item 30,Item 31,Item 32
4,citrus fruit,semi-finished bread,margarine,ready soups,,,,,,,,,,,,,,,,,,,,,,,,,,,,
3,tropical fruit,yogurt,coffee,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
1,whole milk,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
4,pip fruit,yogurt,cream cheese,meat spreads,,,,,,,,,,,,,,,,,,,,,,,,,,,,
4,other vegetables,whole milk,condensed milk,long life bakery product,,,,,,,,,,,,,,,,,,,,,,,,,,,,
5,whole milk,butter,yogurt,rice,abrasive cleaner,,,,,,,,,,,,,,,,,,,,,,,,,,,
1,rolls/buns,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
5,other vegetables,UHT-milk,rolls/buns,bottled beer,liquor (appetizer),,,,,,,,,,,,,,,,,,,,,,,,,,,
1,potted plants,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


In [0]:
# Create a view or table

temp_table_name = "groceries___groceries_csv"

df.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

/* Query the created temp table in a SQL cell */

select * from `groceries___groceries_csv`


_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12,_c13,_c14,_c15,_c16,_c17,_c18,_c19,_c20,_c21,_c22,_c23,_c24,_c25,_c26,_c27,_c28,_c29,_c30,_c31,_c32
Item(s),Item 1,Item 2,Item 3,Item 4,Item 5,Item 6,Item 7,Item 8,Item 9,Item 10,Item 11,Item 12,Item 13,Item 14,Item 15,Item 16,Item 17,Item 18,Item 19,Item 20,Item 21,Item 22,Item 23,Item 24,Item 25,Item 26,Item 27,Item 28,Item 29,Item 30,Item 31,Item 32
4,citrus fruit,semi-finished bread,margarine,ready soups,,,,,,,,,,,,,,,,,,,,,,,,,,,,
3,tropical fruit,yogurt,coffee,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
1,whole milk,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
4,pip fruit,yogurt,cream cheese,meat spreads,,,,,,,,,,,,,,,,,,,,,,,,,,,,
4,other vegetables,whole milk,condensed milk,long life bakery product,,,,,,,,,,,,,,,,,,,,,,,,,,,,
5,whole milk,butter,yogurt,rice,abrasive cleaner,,,,,,,,,,,,,,,,,,,,,,,,,,,
1,rolls/buns,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
5,other vegetables,UHT-milk,rolls/buns,bottled beer,liquor (appetizer),,,,,,,,,,,,,,,,,,,,,,,,,,,
1,potted plants,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


In [0]:
from pyspark.sql.functions import split
data = (spark.read
    .text("/FileStore/tables/groceries___groceries.csv")
    .select(split("value", "\s+").alias("items")))
data.show(truncate=False)


display(data)

items
"List(Item(s),Item, 1,Item, 2,Item, 3,Item, 4,Item, 5,Item, 6,Item, 7,Item, 8,Item, 9,Item, 10,Item, 11,Item, 12,Item, 13,Item, 14,Item, 15,Item, 16,Item, 17,Item, 18,Item, 19,Item, 20,Item, 21,Item, 22,Item, 23,Item, 24,Item, 25,Item, 26,Item, 27,Item, 28,Item, 29,Item, 30,Item, 31,Item, 32)"
"List(4,citrus, fruit,semi-finished, bread,margarine,ready, soups,,,,,,,,,,,,,,,,,,,,,,,,,,,,)"
"List(3,tropical, fruit,yogurt,coffee,,,,,,,,,,,,,,,,,,,,,,,,,,,,,)"
"List(1,whole, milk,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,)"
"List(4,pip, fruit,yogurt,cream, cheese,meat, spreads,,,,,,,,,,,,,,,,,,,,,,,,,,,,)"
"List(4,other, vegetables,whole, milk,condensed, milk,long, life, bakery, product,,,,,,,,,,,,,,,,,,,,,,,,,,,,)"
"List(5,whole, milk,butter,yogurt,rice,abrasive, cleaner,,,,,,,,,,,,,,,,,,,,,,,,,,,)"
"List(1,rolls/buns,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,)"
"List(5,other, vegetables,UHT-milk,rolls/buns,bottled, beer,liquor, (appetizer),,,,,,,,,,,,,,,,,,,,,,,,,,,)"
"List(1,potted, plants,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,)"


In [0]:
import sys
import mlflow.tensorflow
import mlflow.spark

from pyspark import keyword_only, since
from pyspark.sql import DataFrame
from pyspark.ml.util import JavaMLWritable, JavaMLReadable
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams
from pyspark.ml.param.shared import HasPredictionCol, Param, TypeConverters, Params
__all__ = ["FPGrowth", "FPGrowthModel", "PrefixSpan"]



class _FPGrowthParams(HasPredictionCol):
    """
    Params for :py:class:`FPGrowth` and :py:class:`FPGrowthModel`.

    .. versionadded:: 3.0.0
    """

    itemsCol = Param(Params._dummy(), "itemsCol",
                     "items column name", typeConverter=TypeConverters.toString)
    minSupport = Param(
        Params._dummy(),
        "minSupport",
        "Minimal support level of the frequent pattern. [0.0, 1.0]. " +
        "Any pattern that appears more than (minSupport * size-of-the-dataset) " +
        "times will be output in the frequent itemsets.",
        typeConverter=TypeConverters.toFloat)
    numPartitions = Param(
        Params._dummy(),
        "numPartitions",
        "Number of partitions (at least 1) used by parallel FP-growth. " +
        "By default the param is not set, " +
        "and partition number of the input dataset is used.",
        typeConverter=TypeConverters.toInt)
    minConfidence = Param(
        Params._dummy(),
        "minConfidence",
        "Minimal confidence for generating Association Rule. [0.0, 1.0]. " +
        "minConfidence will not affect the mining for frequent itemsets, " +
        "but will affect the association rules generation.",
        typeConverter=TypeConverters.toFloat)
  


    
    def __init__(self, *args):
        super(_FPGrowthParams, self).__init__(*args)
        self._setDefault(minSupport=0.3, minConfidence=0.8,
                         itemsCol="items", predictionCol="prediction")

    def getItemsCol(self):
        """
        Gets the value of itemsCol or its default value.
        """
        return self.getOrDefault(self.itemsCol)

    def getMinSupport(self):
        """
        Gets the value of minSupport or its default value.
        """
        return self.getOrDefault(self.minSupport)

    def getNumPartitions(self):
        """
        Gets the value of :py:attr:`numPartitions` or its default value.
        """
        return self.getOrDefault(self.numPartitions)

    def getMinConfidence(self):
        """
        Gets the value of minConfidence or its default value.
        """
        return self.getOrDefault(self.minConfidence)


In [0]:
class FPGrowthModel(JavaModel, _FPGrowthParams, JavaMLWritable, JavaMLReadable):
       """
    Model fitted by FPGrowth.

    .. versionadded:: 2.2.0
    """

In [0]:

    def setItemsCol(self, value):
        """
        Sets the value of :py:attr:`itemsCol`.
        """
        return self._set(itemsCol=value)

In [0]:
def setMinConfidence(self, value):
        """
        Sets the value of :py:attr:`minConfidence`.
        """
        return self._set(minConfidence=value)

In [0]:
    def setPredictionCol(self, value):
        """
        Sets the value of :py:attr:`predictionCol`.
        """
        return self._set(predictionCol=value)


    @property
    @since("2.2.0")
    def freqItemsets(self):
        """
        DataFrame with two columns:
        * `items` - Itemset of the same type as the input column.
        * `freq`  - Frequency of the itemset (`LongType`).
        """
        return self._call_java("freqItemsets")

    @property
    @since("2.2.0")
    def associationRules(self):
        """
        DataFrame with four columns:
        * `antecedent`  - Array of the same type as the input column.
        * `consequent`  - Array of the same type as the input column.
        * `confidence`  - Confidence for the rule (`DoubleType`).
        * `lift`        - Lift for the rule (`DoubleType`).
        """
        return self._call_java("associationRules")



In [0]:
class FPGrowth(JavaEstimator, _FPGrowthParams, JavaMLWritable, JavaMLReadable):
    r"""
    A parallel FP-growth algorithm to mine frequent itemsets.

    .. versionadded:: 2.2.0

    Notes
    -----

    The algorithm is described in
    Li et al., PFP: Parallel FP-Growth for Query Recommendation [1]_.
    PFP distributes computation in such a way that each worker executes an
    independent group of mining tasks. The FP-Growth algorithm is described in
    Han et al., Mining frequent patterns without candidate generation [2]_

    NULL values in the feature column are ignored during `fit()`.

    Internally `transform` `collects` and `broadcasts` association rules.


    .. [1] Haoyuan Li, Yi Wang, Dong Zhang, Ming Zhang, and Edward Y. Chang. 2008.
        Pfp: parallel fp-growth for query recommendation.
        In Proceedings of the 2008 ACM conference on Recommender systems (RecSys '08).
        Association for Computing Machinery, New York, NY, USA, 107-114.
        DOI: https://doi.org/10.1145/1454008.1454027
    .. [2] Jiawei Han, Jian Pei, and Yiwen Yin. 2000.
        Mining frequent patterns without candidate generation.
        SIGMOD Rec. 29, 2 (June 2000), 1-12.
        DOI: https://doi.org/10.1145/335191.335372


    Examples
    --------
    >>> from pyspark.sql.functions import split
    >>> data = (spark.read
    ...     .text("data/mllib/sample_fpgrowth.txt")
    ...     .select(split("value", "\s+").alias("items")))
    >>> data.show(truncate=False)
    +------------------------+
    |items                   |
    +------------------------+
    |[r, z, h, k, p]         |
    |[z, y, x, w, v, u, t, s]|
    |[s, x, o, n, r]         |
    |[x, z, y, m, t, s, q, e]|
    |[z]                     |
    |[x, z, y, r, q, t, p]   |
    +------------------------+
    ...
    >>> fp = FPGrowth(minSupport=0.2, minConfidence=0.7)
    >>> fpm = fp.fit(data)
    >>> fpm.setPredictionCol("newPrediction")
    FPGrowthModel...
    >>> fpm.freqItemsets.show(5)
    +---------+----+
    |    items|freq|
    +---------+----+
    |      [s]|   3|
    |   [s, x]|   3|
    |[s, x, z]|   2|
    |   [s, z]|   2|
    |      [r]|   3|
    +---------+----+
    only showing top 5 rows
    ...
    >>> fpm.associationRules.show(5)
    +----------+----------+----------+----+------------------+
    |antecedent|consequent|confidence|lift|           support|
    +----------+----------+----------+----+------------------+
    |    [t, s]|       [y]|       1.0| 2.0|0.3333333333333333|
    |    [t, s]|       [x]|       1.0| 1.5|0.3333333333333333|
    |    [t, s]|       [z]|       1.0| 1.2|0.3333333333333333|
    |       [p]|       [r]|       1.0| 2.0|0.3333333333333333|
    |       [p]|       [z]|       1.0| 1.2|0.3333333333333333|
    +----------+----------+----------+----+------------------+
    only showing top 5 rows
    ...
    >>> new_data = spark.createDataFrame([(["t", "s"], )], ["items"])
    >>> sorted(fpm.transform(new_data).first().newPrediction)
    ['x', 'y', 'z']
    >>> model_path = temp_path + "/fpm_model"
    >>> fpm.save(model_path)
    >>> model2 = FPGrowthModel.load(model_path)
    >>> fpm.transform(data).take(1) == model2.transform(data).take(1)
    True
    """
    @keyword_only
    def __init__(self, *, minSupport=0.3, minConfidence=0.8, itemsCol="items",
                 predictionCol="prediction", numPartitions=None):
        """
        __init__(self, \\*, minSupport=0.3, minConfidence=0.8, itemsCol="items", \
                 predictionCol="prediction", numPartitions=None)
        """
        super(FPGrowth, self).__init__()
        self._java_obj = self._new_java_obj("org.apache.spark.ml.fpm.FPGrowth", self.uid)
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "groceries___groceries_csv"


# df.write.format("parquet").saveAsTable(permanent_table_name)