<a href="https://colab.research.google.com/github/camiloakv/pyspark_cookbook/blob/main/pyspark_cookbook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

>[Setup](#scrollTo=xPXzpKbYeBmx)

>[Create DataFrame](#scrollTo=srFXZBCJpxvu)

>>[Create from range](#scrollTo=gWPdNWv2Y0Q6)

>>[Create from list](#scrollTo=BODWWdkHY0RC)

>>[Create from list of tuples](#scrollTo=CH0erjoKY0RG)

>>[Add index column](#scrollTo=tiy2MgkxY0RI)

>>>[v0](#scrollTo=4yBO2gcnY0RJ)

>>>[v1](#scrollTo=vDm7Rg1TY0RJ)

>>[Add column with constant value](#scrollTo=ljeNbD39Y0RL)

>[Extract from DataFrame](#scrollTo=S7H-8G5FsVfL)

>>[Get first value of column](#scrollTo=eoj46n_eY0SI)

>>[Get list from column](#scrollTo=hH3cuI1cY0Sc)

>>[Get minimum/maximum from column](#scrollTo=jwtpz8qOY0Sd)

>>>[v0: list minimum](#scrollTo=dQxUqEtFY0Sd)

>>>[v1: agg](#scrollTo=HvAriCZ0Y0Se)

>[Transformations](#scrollTo=Llr04sOjtJWL)

>>[Shuffle rows](#scrollTo=7WC8rRMyY0Sb)

>>[value_counts](#scrollTo=6eAIIqMDY0Sg)

>>[Select n top rows per group](#scrollTo=wN4EfS8qY0Sh)

>>[Pivot table](#scrollTo=0BdIEuGJY0RM)

>>>[v0: use this!](#scrollTo=MgmtVcrBY0RM)

>>>[v1: optimized (Spark $\geq$ 2.0)](#scrollTo=kfvIt3YsY0RN)

>>>[v2: optimized](#scrollTo=lZFQ6rdqY0RO)

>>[Merge (concat)](#scrollTo=ZcH5tZ2EY0Se)

>>>[v0: by column position](#scrollTo=Riil-1mbWJbY)

>>>[v1: by column name (Spark >= 3.1)](#scrollTo=ZNAV8txkY0Sg)

>>[Joins](#scrollTo=gulioPLWY0SO)

>[Scratch / Work in progress (ignore)](#scrollTo=7EToxqmWaJPC)

>>[Normalize list](#scrollTo=MvgUrLK9Y0SK)

>>[Normalize column](#scrollTo=ZEuhAyeyY0SM)

>>[Cosine similarity](#scrollTo=QkBafww5Y0RP)



# Setup

https://medium.com/grabngoinfo/install-pyspark-3-on-google-colab-the-easy-way-577ec4a2bcd8

In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

In [3]:
sc = spark.sparkContext

In [4]:
from pyspark.sql import functions as sf
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql.window import Window


# Create DataFrame

## Create from range

https://stackoverflow.com/questions/36803030/pyspark-randomize-rows-in-dataframe

In [5]:
df_from_range = sc.parallelize(range(10, 20)).map(lambda x: (x, )).toDF(["x"])

In [6]:
df_from_range.show()

+---+
|  x|
+---+
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+



## Create from list

https://stackoverflow.com/questions/36803030/pyspark-randomize-rows-in-dataframe

In [7]:
mylist = [1, 1, 2, 3, 5, 8, 13, 21, 34]
df_from_list = spark.createDataFrame(mylist, IntegerType()).selectExpr("value as Fibonacci")

In [8]:
df_from_list.show()

+---------+
|Fibonacci|
+---------+
|        1|
|        1|
|        2|
|        3|
|        5|
|        8|
|       13|
|       21|
|       34|
+---------+



In [9]:
df_from_list2 = spark.createDataFrame([1.234, 5.678], DoubleType()).selectExpr("value as float_nums")

In [10]:
df_from_list2.show()

+----------+
|float_nums|
+----------+
|     1.234|
|     5.678|
+----------+



## Create from list of tuples

https://sparkbyexamples.com/pyspark/pyspark-create-dataframe-from-list/

In [11]:
lot = [
    (1, 1),
    (1, 2),
    (2, 2),
    (2, 1),
    (2, 3),
    (3, 5),
    (3, 6),
    (3, 3),
    (3, 4),
]
lot_cols = ["id", "item"]
df_lot = spark.createDataFrame(data=lot, schema=lot_cols)

In [12]:
df_lot.show()

+---+----+
| id|item|
+---+----+
|  1|   1|
|  1|   2|
|  2|   2|
|  2|   1|
|  2|   3|
|  3|   5|
|  3|   6|
|  3|   3|
|  3|   4|
+---+----+



## Add index column

https://stackoverflow.com/questions/43406887/spark-dataframe-how-to-add-a-index-column-aka-distributed-data-index

### v0

Monotonically increasing, not necessarely consecutive:

In [13]:
df_index = df_from_range.select("*").withColumn("index1", sf.monotonically_increasing_id())

In [14]:
df_index.show()

+---+----------+
|  x|    index1|
+---+----------+
| 10|         0|
| 11|         1|
| 12|         2|
| 13|         3|
| 14|         4|
| 15|8589934592|
| 16|8589934593|
| 17|8589934594|
| 18|8589934595|
| 19|8589934596|
+---+----------+



### v1
Consecutive, starting from zero:

In [15]:
df_index2 = df_from_range.withColumn(
    "index2",
    sf.row_number().over(Window.orderBy(sf.monotonically_increasing_id())) - 1
)

In [16]:
df_index2.show()

+---+------+
|  x|index2|
+---+------+
| 10|     0|
| 11|     1|
| 12|     2|
| 13|     3|
| 14|     4|
| 15|     5|
| 16|     6|
| 17|     7|
| 18|     8|
| 19|     9|
+---+------+



## Add column with constant value

In [17]:
df_const = df_index2.withColumn("val", sf.lit(1))

In [18]:
df_const.show()

+---+------+---+
|  x|index2|val|
+---+------+---+
| 10|     0|  1|
| 11|     1|  1|
| 12|     2|  1|
| 13|     3|  1|
| 14|     4|  1|
| 15|     5|  1|
| 16|     6|  1|
| 17|     7|  1|
| 18|     8|  1|
| 19|     9|  1|
+---+------+---+



# Extract from DataFrame

## Get first value of column

https://stackoverflow.com/questions/56442215/how-to-get-first-value-and-last-value-from-dataframe-column-in-pyspark

In [19]:
first_record = df_const.collect()[0]['x']

In [20]:
first_record, type(first_record)

(10, int)

## Get list from column

In [21]:
li = df_index.select("x").rdd.flatMap(lambda x: x).collect()

In [22]:
li

[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

## Get minimum/maximum from column
https://stackoverflow.com/questions/33224740/best-way-to-get-the-max-value-in-a-spark-dataframe-column

### v0: list minimum
get list, then get minimum (see previous section)

### v1: agg

In [23]:
df_index.agg({"x": "min"}).collect()[0][f"min(x)"]

10

# Transformations

## Shuffle rows

In [24]:
df_shuffle = df_index.orderBy(sf.rand(42))

In [25]:
df_shuffle.show()

+---+----------+
|  x|    index1|
+---+----------+
| 18|8589934595|
| 17|8589934594|
| 13|         3|
| 11|         1|
| 10|         0|
| 19|8589934596|
| 16|8589934593|
| 14|         4|
| 15|8589934592|
| 12|         2|
+---+----------+



## value_counts

== groupBy + count

In [26]:
df_lot.groupBy("id").count().orderBy(sf.desc("count")).show()

+---+-----+
| id|count|
+---+-----+
|  3|    4|
|  2|    3|
|  1|    2|
+---+-----+



## Select n top rows per group

In [27]:
n_selected = 2
w = Window.partitionBy("id").orderBy(sf.col("item"))
df_selected = df_lot.withColumn("row", sf.row_number().over(w))\
  .filter(sf.col("row") <= n_selected)\
  .drop(sf.col("row"))

In [28]:
df_selected.show()

+---+----+
| id|item|
+---+----+
|  1|   1|
|  1|   2|
|  2|   1|
|  2|   2|
|  3|   3|
|  3|   4|
+---+----+



## Pivot table

https://sparkbyexamples.com/pyspark/pyspark-pivot-and-unpivot-dataframe/

### v0: use this!

In [29]:
df_pivot = df_const.groupBy("index2").pivot("x").sum("val").fillna(0)

In [30]:
df_pivot.show(10)

+------+---+---+---+---+---+---+---+---+---+---+
|index2| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19|
+------+---+---+---+---+---+---+---+---+---+---+
|     0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|     1|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|
|     2|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|
|     3|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|
|     4|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|
|     5|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|
|     6|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|
|     7|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|
|     8|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|
|     9|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|
+------+---+---+---+---+---+---+---+---+---+---+



In [31]:
# TESTING PERFORMANCE
#df_index3 = spark.createDataFrame(range(100), IntegerType()).selectExpr("value as x")
#df_index3 = df_index3.withColumn('index', sf.row_number().over(Window.orderBy(sf.monotonically_increasing_id())) - 1)
#df_const2 = df_index3.withColumn("val", sf.lit(1))
#df_pivot = df_const2.groupBy("index").pivot("x").sum("val").fillna(0)

In [32]:
#df_pivot.show(10)

### v1: optimized (Spark $\geq$ 2.0)

In [33]:
df_pivot1 = df_const.groupBy("index2").pivot("x", range(10, 20)).sum("val").fillna(0)

In [34]:
df_pivot1.show()

+------+---+---+---+---+---+---+---+---+---+---+
|index2| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19|
+------+---+---+---+---+---+---+---+---+---+---+
|     0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|     1|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|
|     2|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|
|     3|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|
|     4|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|
|     5|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|
|     6|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|
|     7|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|
|     8|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|
|     9|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|
+------+---+---+---+---+---+---+---+---+---+---+



### v2: optimized

In [35]:
df_pivot2 = df_const \
    .groupBy("index2", "x") \
    .sum("val") \
    .groupBy("index2") \
    .pivot("x") \
    .sum("sum(val)") \
    .fillna(0)

In [36]:
df_pivot2.show()

+------+---+---+---+---+---+---+---+---+---+---+
|index2| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19|
+------+---+---+---+---+---+---+---+---+---+---+
|     0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|     1|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|
|     2|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|
|     3|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|
|     4|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|
|     5|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|
|     6|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|
|     7|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|
|     8|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|
|     9|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|
+------+---+---+---+---+---+---+---+---+---+---+



__Note:__ v0 outperforms v2, use that one!

## Merge (concat)
https://walkenho.github.io/merging-multiple-dataframes-in-pyspark/

### v0: by column position

In [37]:
from functools import reduce
from pyspark.sql import DataFrame

In [38]:
df_index_concat = reduce(DataFrame.unionAll, [df_index, df_index2])

In [39]:
df_index_concat.show()

+---+----------+
|  x|    index1|
+---+----------+
| 10|         0|
| 11|         1|
| 12|         2|
| 13|         3|
| 14|         4|
| 15|8589934592|
| 16|8589934593|
| 17|8589934594|
| 18|8589934595|
| 19|8589934596|
| 10|         0|
| 11|         1|
| 12|         2|
| 13|         3|
| 14|         4|
| 15|         5|
| 16|         6|
| 17|         7|
| 18|         8|
| 19|         9|
+---+----------+



### v1: by column name (Spark >= 3.1)
https://sparkbyexamples.com/spark/spark-merge-two-dataframes-with-different-columns/

In [40]:
df_merge = df_index.unionByName(df_index2, allowMissingColumns=True)

In [41]:
df_merge.show()

+---+----------+------+
|  x|    index1|index2|
+---+----------+------+
| 10|         0|  null|
| 11|         1|  null|
| 12|         2|  null|
| 13|         3|  null|
| 14|         4|  null|
| 15|8589934592|  null|
| 16|8589934593|  null|
| 17|8589934594|  null|
| 18|8589934595|  null|
| 19|8589934596|  null|
| 10|      null|     0|
| 11|      null|     1|
| 12|      null|     2|
| 13|      null|     3|
| 14|      null|     4|
| 15|      null|     5|
| 16|      null|     6|
| 17|      null|     7|
| 18|      null|     8|
| 19|      null|     9|
+---+----------+------+



## Joins

https://www.learnbymarketing.com/1100/pyspark-joins-by-example/

https://stackoverflow.com/questions/46944493/removing-duplicate-columns-after-a-df-join-in-spark

In [42]:
dfl = df_index.filter(sf.col("x") < 18)
dfr = df_index2.filter(sf.col("x") > 11)

In [43]:
dfl.join(dfr, ["x"], how='left').orderBy(sf.asc("x")).show()

+---+----------+------+
|  x|    index1|index2|
+---+----------+------+
| 10|         0|  null|
| 11|         1|  null|
| 12|         2|     2|
| 13|         3|     3|
| 14|         4|     4|
| 15|8589934592|     5|
| 16|8589934593|     6|
| 17|8589934594|     7|
+---+----------+------+



In [44]:
dfl.join(dfr, ["x"], how='right').orderBy(sf.asc("x")).show()

+---+----------+------+
|  x|    index1|index2|
+---+----------+------+
| 12|         2|     2|
| 13|         3|     3|
| 14|         4|     4|
| 15|8589934592|     5|
| 16|8589934593|     6|
| 17|8589934594|     7|
| 18|      null|     8|
| 19|      null|     9|
+---+----------+------+



In [45]:
dfl.join(dfr, ["x"], how='inner').orderBy(sf.asc("x")).show()  # default

+---+----------+------+
|  x|    index1|index2|
+---+----------+------+
| 12|         2|     2|
| 13|         3|     3|
| 14|         4|     4|
| 15|8589934592|     5|
| 16|8589934593|     6|
| 17|8589934594|     7|
+---+----------+------+



In [46]:
dfl.join(dfr, ["x"], how='outer').orderBy(sf.asc("x")).show()

+---+----------+------+
|  x|    index1|index2|
+---+----------+------+
| 10|         0|  null|
| 11|         1|  null|
| 12|         2|     2|
| 13|         3|     3|
| 14|         4|     4|
| 15|8589934592|     5|
| 16|8589934593|     6|
| 17|8589934594|     7|
| 18|      null|     8|
| 19|      null|     9|
+---+----------+------+



you can always use full outer join and filter the rows required:

In [47]:
# equivalent to left join
dfl\
  .join(dfr, ["x"], how='full_outer')\
  .orderBy(sf.asc("x"))\
  .filter(~sf.col("index1").isNull())\
  .show()

+---+----------+------+
|  x|    index1|index2|
+---+----------+------+
| 10|         0|  null|
| 11|         1|  null|
| 12|         2|     2|
| 13|         3|     3|
| 14|         4|     4|
| 15|8589934592|     5|
| 16|8589934593|     6|
| 17|8589934594|     7|
+---+----------+------+



In [48]:
#spark.stop()

___

# Scratch / Work in progress (ignore)

## Normalize list

In [None]:
from pyspark.mllib.feature import Normalizer

nor = Normalizer()

In [None]:
nor.transform([1,2,3,4])

DenseVector([0.1826, 0.3651, 0.5477, 0.7303])

## Normalize column

In [None]:
from pyspark.ml.feature import Normalizer
normalizer = Normalizer(inputCol="features", outputCol="f2_norm")
df_pivot_feats_norm = normalizer.transform(df_pivot_feats)

In [None]:
x = df_pivot_feats.collect()[0]['features']

In [None]:
type(x)

pyspark.ml.linalg.SparseVector

__Note:__ Needs `pyspark.ml.feature` and `SparseVector`-like data type

## Cosine similarity

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=[x for x in df_pivot.columns if x not in ["index"]],
    outputCol="features"
)

df_pivot_feats = assembler.transform(df_pivot)

In [None]:
df_pivot_feats.show()

+-----+---+---+---+---+---+---+---+---+---+---+--------------+
|index| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19|      features|
+-----+---+---+---+---+---+---+---+---+---+---+--------------+
|    0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|(10,[0],[1.0])|
|    1|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|(10,[1],[1.0])|
|    2|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|(10,[2],[1.0])|
|    3|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|(10,[3],[1.0])|
|    4|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|(10,[4],[1.0])|
|    5|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|(10,[5],[1.0])|
|    6|  0|  0|  0|  0|  0|  0|  1|  0|  0|  0|(10,[6],[1.0])|
|    7|  0|  0|  0|  0|  0|  0|  0|  1|  0|  0|(10,[7],[1.0])|
|    8|  0|  0|  0|  0|  0|  0|  0|  0|  1|  0|(10,[8],[1.0])|
|    9|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|(10,[9],[1.0])|
+-----+---+---+---+---+---+---+---+---+---+---+--------------+



In [None]:
from pyspark.ml.linalg import SparseVector, DenseVector

In [None]:
##df_pivot_feats.withColumn("f2", DenseVector(sf.col("features").toArray())).show()
#df_pivot_feats.withColumn("f2", DenseVector(sf.col("features"))).show()

Convert to dense

https://stackoverflow.com/questions/58490770/convert-pyspark-densevector-to-array

In [None]:
## v0
#import pyspark.sql.types as st
#
#to_array = sf.udf(lambda v: v.toArray().tolist(), st.ArrayType(st.FloatType()))
#df_pivot_feats = df_pivot_feats.withColumn('f2', to_array('features'))

# v1 (spark >= 3.0)
from pyspark.ml.functions import vector_to_array
df_pivot_feats = df_pivot_feats.withColumn('f2', vector_to_array('features'))

https://stackoverflow.com/questions/46758768/calculating-the-cosine-similarity-between-all-the-rows-of-a-dataframe-in-pyspark

In [None]:
from pyspark.mllib.linalg.distributed import IndexedRowMatrix
mat = IndexedRowMatrix(df_pivot_feats.select("index", "f2").rdd).toBlockMatrix()
dot = mat.multiply(mat.transpose())
#similarity_matrix = dot.toLocalMatrix().toArray()

In [None]:
dot.numCols(), dot.numRows()

(10, 10)

In [None]:
def vvec(nrows, def_val=1):

    df_vvec = spark.createDataFrame(range(nrows), IntegerType()).selectExpr("value as index")

    df_vvec = df_vvec.withColumn("val", sf.lit(def_val))

    from pyspark.ml.feature import VectorAssembler

    assembler = VectorAssembler(
        inputCols=[x for x in df_vvec.columns if x not in ["index"]],
        outputCol="features"
    )

    df_vvec = assembler.transform(df_vvec)

    from pyspark.ml.functions import vector_to_array
    df_vvec = df_vvec.withColumn('f2', vector_to_array('features'))

    vvec = IndexedRowMatrix(df_vvec.select("index", "f2").rdd).toBlockMatrix()

    return vvec

In [None]:
vv = vvec(dot.numCols())
soma_sims = dot.multiply(vv)
soma_sims = vv.transpose().multiply(soma_sims)
soma_sims = soma_sims.toLocalMatrix().toArray()
soma_sims = soma_sims[0][0]

In [None]:
soma_sims

10.0

In [None]:
mat.numCols()

10

In [None]:
mat.numRows()

10

Alternatively, to get the mean similarity:

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.mllib.linalg.distributed.IndexedRowMatrix.html

In [None]:
#spark.stop()

___

In [None]:
mat2 = IndexedRowMatrix(df_pivot_feats.select("index", "f2").rdd)

In [None]:
cs = mat2.columnSimilarities()

In [None]:
dir(cs)

In [None]:
df_pivot_feats = df_pivot_feats.withColumn("index", sf.col("index").cast('bigint'))

In [None]:
r2 = df_pivot_feats.select("index", "features").rdd

In [None]:
type(r2)

In [None]:
from pyspark.mllib.linalg.distributed import IndexedRowMatrix
mat = IndexedRowMatrix(df_pivot_feats.select("index", "f2").rdd)

In [None]:
dfx = df_pivot_feats.select("index", "features").rdd.map(tuple)

In [None]:
type(dfx)

In [None]:
IndexedRowMatrix?

In [None]:
df_pivot_feats.dtypes

In [None]:
r2 = df_pivot_feats.select("index", "features").rdd

In [None]:
type(r2)

In [None]:
mat = IndexedRowMatrix(r2)

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.mllib.linalg.distributed.IndexedRowMatrix.html

In [None]:
rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
                       IndexedRow(6, [4, 5, 6])])

In [None]:
type(rows)

In [None]:
rows.toDF().show()

In [None]:
df_pivot_feats.show()

In [None]:
mat = IndexedRowMatrix(rows)

In [None]:
cs = mat.columnSimilarities()

In [None]:
type(cs)

In [None]:
import numpy as np

In [None]:
np.ndarray(cs.toBlockMatrix())

In [None]:
data.dtypes

In [None]:
rows

In [None]:
print(rows.collect())

In [None]:
r2

In [None]:
print(r2.collect())

In [None]:
mat = IndexedRowMatrix(r2)

In [None]:
data = df_pivot_feats.select("index", "features")

from pyspark.mllib.linalg.distributed import IndexedRowMatrix
mat = IndexedRowMatrix(data).toBlockMatrix()
dot = mat.multiply(mat.transpose())
dot.toLocalMatrix().toArray()

#from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
#mat = IndexedRowMatrix(
#    data.select("index", "features")\
#        .rdd.map(lambda row: IndexedRow(row.index, row.features))).toBlockMatrix()
##        .rdd.map(lambda row: IndexedRow(row.index, row.features.toArray()))).toBlockMatrix()
#dot = mat.multiply(mat.transpose())
#dot.toLocalMatrix().toArray()

(Pearson correlation)

In [None]:
from pyspark.ml.linalg import DenseMatrix, Vectors
from pyspark.ml.stat import Correlation

In [None]:
pearsonCorr = Correlation.corr(df_pivot_feats, 'features', 'spearman').collect()[0][0]

In [None]:
type(pearsonCorr)

In [None]:
pearsonCorr

In [None]:
print(str(pearsonCorr).replace('nan', 'NaN'))

MISC

`df_temp.selectExpr('percentile(P2, 0.90)').show()`