# Applying FPGrowth to Grouped Data with Spark
Background
The task involves applying the FPGrowth algorithm to datasets within multiple groups.
- Spark provides several options for distributed computations, including map, mapPartitions, foreachPartition, and sc.parallelize. These approaches are powerful for many distributed data processing tasks.
- The goal was to assess whether these methods could efficiently process grouped data using FPGrowth.



In [None]:

%%capture
!pip install pyspark findspark
!pip install mlxtend

In [None]:
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F
from mlxtend.frequent_patterns import fpgrowth, association_rules
from mlxtend.preprocessing import TransactionEncoder

from pyspark.ml.fpm import FPGrowth
import  pyspark.sql.types as T
from pyspark.ml.fpm import FPGrowth
import pandas as pd
import random
import string



The data use for expeimentation is a toy dataset of structure:
- peer (int)
- id (string)
- entitlements(string)

In [None]:
n_rows = 1000

  and should_run_async(code)


In [None]:
df = pd.DataFrame(
    {
        'peer': [random.randint(0,5) for _ in range(n_rows)],
        'id': [f"{random.choice(['a', 'b', 'c'])}{random.randint(0,3)}" for _ in range(n_rows)],
        'entitlements': [random.choice(string.ascii_letters).lower() for _ in range(n_rows)]
    }
)

df.head(5)

  and should_run_async(code)


Unnamed: 0,peer,id,entitlements
0,2,c1,h
1,1,a2,l
2,4,c1,j
3,3,a2,s
4,3,c0,e


In [None]:
#global spark session
spark = (SparkSession.builder
        .appName("fpgrowth_experiment")
        .getOrCreate()
        )

  and should_run_async(code)


In [None]:
spark_df = (spark.createDataFrame(df)
            .groupby('peer', 'id')
            .agg(
                F.collect_list('entitlements').alias('ent')
            )
            .withColumn('ent', F.array_distinct('ent'))
          )
spark_df.show(2)

  and should_run_async(code)
  if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):
  if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:
  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


+----+---+--------------------+
|peer| id|                 ent|
+----+---+--------------------+
|   2| c2|[p, q, o, m, g, k...|
|   0| b0|[m, k, r, g, v, j...|
+----+---+--------------------+
only showing top 2 rows



In [None]:
spark_df.printSchema()

root
 |-- peer: long (nullable = true)
 |-- id: string (nullable = true)
 |-- ent: array (nullable = false)
 |    |-- element: string (containsNull = false)



  and should_run_async(code)


#### Count of the number of entitlements assigned to each id, quick litmus test.

In [None]:

#number of qunique groups
num_part = spark_df.dropDuplicates(['peer']).count()
print("unique peer groups in data", num_part)

  and should_run_async(code)


unique peer groups in data 6


In [None]:
def dummy_count(r):
  for i in r:
    yield (i['peer'], len(i['ent']))

result_df = (
    spark_df
    .repartition(num_part, 'peer')
    .rdd
    .mapPartitions(dummy_count)
    .toDF(['peer', 'length'])
)

result_df.toPandas()

  and should_run_async(code)
  if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):
  if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):


Unnamed: 0,peer,length
0,0,17
1,0,7
2,0,9
3,0,9
4,0,9
...,...,...
67,1,8
68,1,11
69,1,13
70,1,7


#### Spark work with spark dataframe
The FPGrowth algorithm in Spark requires a Spark DataFrame as input and does not support other data types. It cannot be applied directly to primitive data types in Spark (e.g., RDDs or Datasets) or to Pandas DataFrames.

In [None]:
display(spark_df.show(2))
fp_growth = FPGrowth(itemsCol="ent", minSupport=0.1, minConfidence=0.1)
model = fp_growth.fit(spark_df)
model.freqItemsets.show(2)


  and should_run_async(code)


+----+---+--------------------+
|peer| id|                 ent|
+----+---+--------------------+
|   2| c2|[p, q, o, m, g, k...|
|   0| b0|[m, k, r, g, v, j...|
+----+---+--------------------+
only showing top 2 rows



None

+------+----+
| items|freq|
+------+----+
|   [y]|  26|
|[y, h]|  10|
+------+----+
only showing top 2 rows



To prove that fpgrowth fails, lets attempt to add a pandas, I'll only use a pandas.

In [None]:

display(spark_df.show(2))

fp_growth = FPGrowth(itemsCol="ent", minSupport=0.1, minConfidence=0.1)
model = fp_growth.fit(spark_df.toPandas())
model.freqItemsets.show(2)



  and should_run_async(code)


+----+---+--------------------+
|peer| id|                 ent|
+----+---+--------------------+
|   2| c2|[p, q, o, m, g, k...|
|   0| b0|[m, k, r, g, v, j...|
+----+---+--------------------+
only showing top 2 rows



None

  if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):
  if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):


AttributeError: 'DataFrame' object has no attribute '_jdf'

#### Example 1
This example demonstrates how to return a Pandas DataFrame within a map method. While Zack's sample code uses sc.parallelize, this approach aims to establish a baseline for what works with a map operation on a Spark DataFrame, leveraging the Catalyst optimizer.

In [None]:
def createdf(r):
  ent_coll = r['com_ent']
  peer = [r['peer']]  * len(ent_coll)
  df = pd.DataFrame({
      'peer': peer,
      'df': ent_coll
  })
  return (r['peer'], {'df': df})


result_df = (
    spark_df
    .groupby('peer')
    .agg(
        F.collect_list('ent').alias('com_ent')
    )
    .rdd
    .map(createdf)

)
#left in rdd format, to maintain the df structure

display(type(result_df.take(2)[0][1]['df']))
display(result_df.take(2))

  and should_run_async(code)


[(0,
  {'df':     peer                                                 df
   0      0  [m, k, r, g, v, j, p, n, c, b, y, e, l, f, d, ...
   1      0                              [r, y, p, e, o, k, u]
   2      0                        [m, j, k, q, e, r, w, g, l]
   3      0                        [m, i, f, u, e, w, g, j, o]
   4      0                        [j, x, t, b, n, s, r, m, q]
   5      0                                       [k, a, c, w]
   6      0                              [m, g, l, r, w, v, x]
   7      0         [w, f, e, i, a, x, q, r, o, t, c, s, y, v]
   8      0                        [u, n, e, b, f, h, p, v, z]
   9      0                     [m, z, h, y, o, u, w, v, i, f]
   10     0                  [v, w, u, h, d, j, f, q, i, r, c]
   11     0                     [i, y, h, f, g, l, m, j, o, z]}),
 (5,
  {'df':     peer                                       df
   0      5        [n, i, d, a, g, h, x, u, c, t, r]
   1      5        [p, a, j, m, k, w, u, b, e, t, 

#### Example 2
The code below demonstrates the main challenge of applying FPGrowth within a map operation. Specifically, the issue arises when attempting to process an RDD by applying FPGrowth, where the first step would be converting the RDD back into a Spark DataFrame. In this example, I am utilizing the global Spark session to handle the conversion.

In [None]:
def createdf(r):
  ent_coll = r['com_ent']
  peer = [r['peer']]  * len(ent_coll)

  df = pd.DataFrame({
      'peer': peer,
      'df': ent_coll
  })
  local_df = spark.createDataFrame(df)
  return (r['peer'], {'df': df})


result_df = (
    spark_df
    .groupby('peer')
    .agg(
        F.collect_list('ent').alias('com_ent')
    )
    .rdd
    .map(createdf)

)

display(type(result_df.take(2)[0][1]['df']))
display(result_df.take(2))

  and should_run_async(code)
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pyspark/serializers.py", line 459, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/usr/local/lib/python3.11/dist-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/context.py", line 466, in __getnewargs__
    raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-

PicklingError: Could not serialize object: PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

#### Example 3

The third example attempts to explicitly specify the structure of the resulting Spark DataFrame by defining a schema in the createdf method:

In [None]:



def createdf(r):
  ent_coll = r['com_ent']
  peer = [r['peer']]  * len(ent_coll)

  df = pd.DataFrame({
      'peer': peer,
      'df': ent_coll
  })
  local_df = spark.createDataFrame(df, schema = T.StructType([
                                      T.StructField('peer', T.StringType(), True),
                                      T.StructField('com_ent', T.ArrayType(T.StringType()), True)
                                  ])
                                  )
  return (r['peer'], {'df': df})


result_df = (
    spark_df
    .groupby('peer')
    .agg(
        F.collect_list('ent').alias('com_ent')
    )
    .rdd
    .map(createdf)

)

display(type(result_df.take(2)[0][1]['df']))
display(result_df.take(2))

  and should_run_async(code)
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pyspark/serializers.py", line 459, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/usr/local/lib/python3.11/dist-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/context.py", line 466, in __getnewargs__
    raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-

PicklingError: Could not serialize object: PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

#### Example 4
In this example I attempt to create the dataframes outside the map function.

In [None]:
def create_df(df):
  return spark.createDataFrame(df, schema = T.StructType([
                                      T.StructField('peer', T.StringType(), True),
                                      T.StructField('com_ent', T.ArrayType(T.StringType()), True)
                                  ])
                              )


def createdf(r):
  ent_coll = r['com_ent']
  peer = [r['peer']]  * len(ent_coll)

  df = pd.DataFrame({
      'peer': peer,
      'df': ent_coll
  })
  local_df = create_df(df)
  return (r['peer'], {'df': df})


result_df = (
    spark_df
    .groupby('peer')
    .agg(
        F.collect_list('ent').alias('com_ent')
    )
    .rdd
    .map(createdf)

)

display(type(result_df.take(2)[0][1]['df']))
display(result_df.take(2))

  and should_run_async(code)
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pyspark/serializers.py", line 459, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/usr/local/lib/python3.11/dist-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/context.py", line 466, in __getnewargs__
    raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-

PicklingError: Could not serialize object: PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

#### Example 5
Create a local spark session to generate dataframes.

In [None]:
def create_df(df):
  local_spark = (SparkSession.builder
                .appName("local_df")
                .getOrCreate()
                )
  return local_spark.createDataFrame(df, schema = T.StructType([
                                      T.StructField('peer', T.StringType(), True),
                                      T.StructField('com_ent', T.ArrayType(T.StringType()), True)
                                  ])
                              )


def createdf(r):
  ent_coll = r['com_ent']
  peer = [r['peer']]  * len(ent_coll)

  df = pd.DataFrame({
      'peer': peer,
      'df': ent_coll
  })
  local_df = create_df(df)
  return (r['peer'], {'df': df})


result_df = (
    spark_df
    .groupby('peer')
    .agg(
        F.collect_list('ent').alias('com_ent')
    )
    .rdd
    .map(createdf)

)

display(type(result_df.take(2)[0][1]['df']))
display(result_df.take(2))

  and should_run_async(code)


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 65.0 failed 1 times, most recent failure: Lost task 0.0 in stage 65.0 (TID 57) (bb63f153af92 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/rdd.py", line 2849, in takeUpToNumLeft
    yield next(iterator)
          ^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "<ipython-input-17-5e813614e078>", line 21, in createdf
  File "<ipython-input-17-5e813614e078>", line 4, in create_df
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/session.py", line 497, in getOrCreate
    sc = SparkContext.getOrCreate(sparkConf)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py", line 515, in getOrCreate
    SparkContext(conf=conf or SparkConf())
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py", line 192, in __init__
    SparkContext._assert_on_driver()
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py", line 2581, in _assert_on_driver
    raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:181)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/rdd.py", line 2849, in takeUpToNumLeft
    yield next(iterator)
          ^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "<ipython-input-17-5e813614e078>", line 21, in createdf
  File "<ipython-input-17-5e813614e078>", line 4, in create_df
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/session.py", line 497, in getOrCreate
    sc = SparkContext.getOrCreate(sparkConf)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py", line 515, in getOrCreate
    SparkContext(conf=conf or SparkConf())
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py", line 192, in __init__
    SparkContext._assert_on_driver()
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py", line 2581, in _assert_on_driver
    raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:181)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


#### Preparing data for sc Parallelize
Convert entire spark datafrme to a list of string, and list of list.

In [None]:

list_tuples =(spark_df
              .groupby('peer')
              .agg(
                  F.collect_list('ent').alias('com_ent')
              )
              .rdd
              .map(lambda c: (c['peer'], c['com_ent']))
              .collect()
             )

type(list_tuples)
spark.stop()

  and should_run_async(code)


In [None]:
from pyspark import SparkContext
sc = SparkContext("local", "Parallelize Example")


  and should_run_async(code)


#### Example a
This is a litmus test to return pandas dataframe, very similar to Zack's code.

In [None]:


def test_map(r):
  ent_coll = r[1]
  peer = [r[0]]  * len(ent_coll)
  df = pd.DataFrame({
      'peer': peer,
      'df': ent_coll
  })
  return (r[0], {'df': df})

(sc.parallelize(list_tuples, num_part)
 .map(test_map)
 .take(2)
)


  and should_run_async(code)


[(0,
  {'df':     peer                                          df
   0      0                    [i, q, s, f, a, u, j, t]
   1      0     [v, s, p, d, k, w, g, f, r, j, c, o, e]
   2      0                    [a, k, b, y, g, w, u, n]
   3      0           [z, n, b, f, d, a, u, y, q, c, m]
   4      0                    [f, p, t, w, y, x, g, n]
   5      0                       [r, g, f, j, u, d, x]
   6      0           [s, e, r, y, m, n, l, q, h, b, z]
   7      0                 [v, s, e, y, g, d, l, p, r]
   8      0                 [o, r, t, s, q, l, v, m, k]
   9      0           [o, d, h, n, v, f, k, t, a, j, e]
   10     0        [y, o, e, r, b, h, x, m, a, u, f, q]
   11     0  [c, k, o, z, d, e, s, j, l, n, y, t, g, b]}),
 (5,
  {'df':     peer                                             df
   0      5     [c, i, x, t, l, w, h, m, z, g, a, y, n, d]
   1      5                       [q, c, a, k, p, h, t, l]
   2      5        [q, j, p, i, r, t, g, f, d, w, n, x, e]
   3      5

#### Example B
Creating a local spark dataframe and returning.

In [None]:

def test_map(r):
  ent_coll = r[1]
  peer = [r[0]]  * len(ent_coll)
  df = pd.DataFrame({
      'peer': peer,
      'df': ent_coll
  })
  spark = (SparkSession.builder
          .appName("fpgrowth_experiment")
          .getOrCreate()
        )

  return (r[0], {'df': spark.createDataFrame(df)})

(sc.parallelize(list_tuples, num_part)
   .map(test_map)
   .take(2)
)

  and should_run_async(code)


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (bb63f153af92 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/rdd.py", line 2849, in takeUpToNumLeft
    yield next(iterator)
          ^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "<ipython-input-20-66c9122c2815>", line 10, in test_map
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/session.py", line 497, in getOrCreate
    sc = SparkContext.getOrCreate(sparkConf)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py", line 515, in getOrCreate
    SparkContext(conf=conf or SparkConf())
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py", line 192, in __init__
    SparkContext._assert_on_driver()
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py", line 2581, in _assert_on_driver
    raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:181)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/rdd.py", line 2849, in takeUpToNumLeft
    yield next(iterator)
          ^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "<ipython-input-20-66c9122c2815>", line 10, in test_map
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/session.py", line 497, in getOrCreate
    sc = SparkContext.getOrCreate(sparkConf)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py", line 515, in getOrCreate
    SparkContext(conf=conf or SparkConf())
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py", line 192, in __init__
    SparkContext._assert_on_driver()
  File "/usr/local/lib/python3.11/dist-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py", line 2581, in _assert_on_driver
    raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:181)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


#### Why the Loop Was Chosen


The FPGrowth algorithm is not designed for parallelized, independent application to subsets of data within Spark.
It requires specific preprocessing and expects input data to follow a defined structure for each execution.
Spark Processing Limitations:

- map and mapPartitions: These transformations are suitable for element-wise or partition-wise processing but are not well-suited for running complex algorithms like FPGrowth independently for each group.
- foreachPartition: While this supports operations at the partition level, it is primarily used for side effects and does not return new datasets.
- sc.parallelize: This is useful for creating RDDs but lacks the granularity required to manage grouped processing effectively. Furthermore, it still fails when attempting to create Spark DataFrames in parallel.
Data Grouping:

- The data is grouped by a specific attribute, and each group requires an independent application of FPGrowth.
Running FPGrowth on grouped data within Spark’s parallel framework leads to complications in managing model inputs and outputs.
Serialization Challenges:

- Passing the FPGrowth model or its intermediate outputs across Spark’s distributed tasks often leads to serialization issues.
Using a loop avoids these issues by keeping the computation localized for each group.

#### Conclusion
- Using a loop was the most practical and reliable method for applying FPGrowth to grouped data in this scenario, or during my research. I also toyed around with veracious serialization when recreating a data frame, but it seems to not work as expected, or it stalls one me.

- **Performance Considerations**:	While distributed processing can speed up many tasks, the overhead of initializing FPGrowth multiple times within a distributed environment outweighed the benefits for this use case.

