In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [None]:
from spark_plot import mpl
mpl.set_defaults()

In [None]:
spark = (SparkSession
    .builder
    .appName("nycflights-hist-dev")
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .getOrCreate()
)

In [None]:
from nycflights13 import flights as flights_pd

In [None]:
flights = spark.createDataFrame(flights_pd)

In [None]:
flights.persist()

In [None]:
from spark_plot import utils
from pyspark.ml.feature import Bucketizer

In [None]:
column = "distance"
bins = 30

In [None]:
data = flights[[column]]

In [None]:
buckets = utils.spark_buckets(data, column, bins=bins)

In [None]:
bucketizer = Bucketizer(splits=buckets, inputCol=column, outputCol="bucket")
buckets_df = bucketizer.setHandleInvalid("error").transform(data)

In [None]:
histogram = buckets_df.groupby("bucket").agg(F.count(column).alias("count"))

In [None]:
histogram = histogram.orderBy("bucket", ascending=True)

In [None]:
histogram.show()

In [None]:
hist_pd = histogram.toPandas()
hist_pd

In [None]:
fill_buckets = pd.DataFrame(columns=["bucket"])

In [None]:
fill_buckets["bucket"] = np.arange(len(buckets))

In [None]:
fill_buckets = fill_buckets.merge(hist_pd, on="bucket", how='left')

In [None]:
weights = fill_buckets["count"]

In [None]:
len(weights)

In [None]:
fig, ax = plt.subplots(1, 1)

ax.hist(buckets, len(buckets), weights=weights)

## Compare to Pandas

In [None]:
flights_pd.hist(column, bins=40, figsize=(10, 10))