In [1]:
# prompt: I need to import pyspark library and load sample file

!pip install pyspark
import pyspark
from pyspark.sql import SparkSession



Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=edcac57718d5d8f4ea4440f57438f51cfb479d1fb7a953f0af23ca05807de2d4
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
from pyspark.sql import SparkSession

# Configure Spark properties
spark = SparkSession.builder \
    .appName("Segwise") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "2") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .getOrCreate()


In [3]:
df = spark.read.parquet('/content/drive/MyDrive/google-play-dataset-by-tapivedotcom.parquet')

# read your csv I have converted to parquet and reading it
#playstore.csv


In [4]:
# Define numerical fields for binning
from pyspark.sql.functions import col, when, expr
from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import udf, broadcast, col, count

import math

numerical_fields = [
    'minInstalls',
    'price',
    'offersIAP',
    'ratings',
    'adSupported',
    'containsAds',
    'reviews',
    'free',
    'score',
    'releasedDay',
    'releasedYear',
    'maxprice',
    'histogram1',
 'histogram2',
 'histogram3',
 'histogram4',
 'histogram5',
]
df = df.na.fill(0, subset=numerical_fields)

for field in numerical_fields:
    df = df.withColumn(field, col(field).cast('double'))
# Function to generate bin size for a column
def gen_bin_size(df, column, num_bins=5):
    min_val, max_val = df.selectExpr(f"min({column})", f"max({column})").collect()[0]
    bin_size = max(math.ceil((max_val - min_val) / num_bins), 1)
    return bin_size

def gen_bin_size_dynamic(df, column, bin_percentage=5):
    min_val, max_val = df.selectExpr(f"min({column})", f"max({column})").collect()[0]
    bin_size = max(math.ceil((max_val - min_val) * bin_percentage / 100), 1)
    return bin_size


# Define a function for binning numerical fields
def bin_numerical_fields(df, fields, num_bins=5):
    bin_columns = []
    for field in fields:
        # Check if the field has at least 20 unique values
        unique_values = df.select(field).distinct().count()
        if unique_values >= 20:
            # Calculate bin size
            bin_size = gen_bin_size_dynamic(df, field, num_bins)
            print(f"Field: {field}, Bin Size: {bin_size}")

            # Create Bucketizer to create bins
            max_val = int(df.selectExpr(f"max({field})").collect()[0][0])
            splits = [float(i) for i in range(0, max_val, bin_size)] + [float(max_val + 1)]  # Add one more split

            intervals = []
            for i in range(0, len(splits)-1):
                intervals.append(f"({splits[i]}, {splits[i+1]}]")
            mapping = spark.sparkContext.broadcast(intervals)

            def get_binns(values):
                def f(x):
                    if x is None:
                        return values[int(0)]
                    else:
                        return values[int(x)]
                return udf(f)
            bucketizer = Bucketizer(splits=splits, inputCol=field, outputCol=field + '_bin')

            # Transform the data and add bin column to the DataFrame
            df = bucketizer.transform(df)
            df = df.withColumn(field+'_bin', get_binns(mapping.value)(col(field+'_bin')))

            bin_columns.append(field + "_bin")

    return df, bin_columns

# Bin numerical fields
binned_df, bin_columns = bin_numerical_fields(df, numerical_fields)

# Define categorical fields for filtering
categorical_fields = ['genre']

# Filter out combinations smaller than 2% of total volume
filtered_df = binned_df
for field in categorical_fields:
    field_counts = binned_df.groupBy(field).count()
    total_count = binned_df.count()
    filtered_df = filtered_df.join(field_counts, field, "left_outer").filter(col("count") >= 0.02 * total_count)


# Define columns for the final output



Field: minInstalls, Bin Size: 500000000
Field: price, Bin Size: 55
Field: ratings, Bin Size: 8320873
Field: reviews, Bin Size: 219701
Field: score, Bin Size: 1
Field: releasedDay, Bin Size: 2
Field: maxprice, Bin Size: 52
Field: histogram1, Bin Size: 2663685
Field: histogram2, Bin Size: 230845
Field: histogram3, Bin Size: 402898
Field: histogram4, Bin Size: 841874
Field: histogram5, Bin Size: 5948483


In [5]:
# dropping unrequired column
lio = []
for i in filtered_df.columns:
  if '_bin' in i or i=='count':
    pass
  else:
    lio.append(i)
filtered_df = filtered_df.drop(*lio)

In [6]:
from pyspark.sql import functions as F

for col in filtered_df.columns[:-1]:  # Exclude the 'finalCount' column
    filtered_df = filtered_df.withColumn(col, F.translate(col, '()', '[]'))

In [7]:
output_columns = bin_columns + categorical_fields + ["count"]


In [8]:
bin_columns

['minInstalls_bin',
 'price_bin',
 'ratings_bin',
 'reviews_bin',
 'score_bin',
 'releasedDay_bin',
 'maxprice_bin',
 'histogram1_bin',
 'histogram2_bin',
 'histogram3_bin',
 'histogram4_bin',
 'histogram5_bin']

In [9]:
from itertools import chain, combinations

# due to memory issues limiting the subset
subset_limit = 10
# If you decrease the subset_limit the subset will be more
numerical_subsets = list(chain.from_iterable(combinations(bin_columns, r) for r in range(0,len(bin_columns) + 1,subset_limit)))


In [10]:
# create empty df
from pyspark.sql.types import StructType, StructField, StringType,IntegerType

final_schema = StructType([
    StructField("minInstalls_bin", StringType(), True),
    StructField("price_bin", StringType(), True),
    StructField("ratings_bin", StringType(), True),
    StructField("reviews_bin", StringType(), True),
    StructField("score_bin", StringType(), True),
    StructField("releasedDay_bin", StringType(), True),
    StructField("maxprice_bin", StringType(), True),
    StructField("histogram1_bin", StringType(), True),
    StructField("histogram2_bin", StringType(), True),
    StructField("histogram3_bin", StringType(), True),
    StructField("histogram4_bin", StringType(), True),
    StructField("histogram5_bin", StringType(), True),
    StructField("finalCount", IntegerType(), True)
])
final_df = spark.createDataFrame([], schema=final_schema)

In [11]:
csv_rows = []
c =0
from pyspark.sql import functions as F
from pyspark.sql.functions import lit


for fil in numerical_subsets[::-1]:
  subset_df = filtered_df.groupBy(*fil).agg(F.sum("count").alias("finalCount"))
  # print(subset_df.columns)
  for i in bin_columns:
    if i not in subset_df.columns:
      subset_df = subset_df.withColumn(i ,lit(None))


  final_df = final_df.union(subset_df)
  # final_df = final_df.repartition(100)
  c+=1


In [12]:
final_df.count()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: ignored

In [13]:
final_df = final_df.selectExpr(
    "CASE WHEN minInstalls_bin IS NOT NULL THEN concat('minInstalls=', cast(minInstalls_bin as string)) END as a1",
    "CASE WHEN price_bin IS NOT NULL THEN concat('price=', cast(price_bin as string)) END as a2",
    "CASE WHEN ratings_bin IS NOT NULL THEN concat('ratings=', cast(ratings_bin as string)) END as a3",
    "CASE WHEN reviews_bin IS NOT NULL THEN concat('reviews=', cast(reviews_bin as string)) END as a4",
    "CASE WHEN score_bin IS NOT NULL THEN concat('score=', cast(score_bin as string)) END as a5",
    "CASE WHEN releasedDay_bin IS NOT NULL THEN concat('releasedDay=', cast(releasedDay_bin as string)) END as a6",
    "CASE WHEN maxprice_bin IS NOT NULL THEN concat('maxprice=', cast(maxprice_bin as string)) END as a7",
    "CASE WHEN histogram1_bin IS NOT NULL THEN concat('histogram1=', cast(histogram1_bin as string)) END as a8",
    "CASE WHEN histogram2_bin IS NOT NULL THEN concat('histogram2=', cast(histogram1_bin as string)) END as a9",
    "CASE WHEN histogram3_bin IS NOT NULL THEN concat('histogram3=', cast(histogram1_bin as string)) END as a10",
    "CASE WHEN histogram4_bin IS NOT NULL THEN concat('histogram4=', cast(histogram4_bin as string)) END as a11",
    "CASE WHEN histogram5_bin IS NOT NULL THEN concat('histogram5=', cast(histogram5_bin as string)) END as a12",
    "cast(finalCount as string) as finalCount"
).filter("a1 IS NOT NULL or a2 IS NOT NULL or a3 IS NOT NULL or a4 IS NOT NULL or a5 IS NOT NULL " +
         "or a6 IS NOT NULL or a7 IS NOT NULL or a8 IS NOT NULL or a9 IS NOT NULL or a10 IS NOT NULL or a11 IS NOT NULL or a12 IS NOT NULL")


In [14]:
# ll = []
# for i in range(c):
#   s = f'/content/spark_outputt/{i}/'
#   ll.append(s)
# spa = spark.read.csv(ll,header=True)

# output_df1 = spa.selectExpr(
#     "concat('minInstalls=', cast(minInstalls_bin as string)) as a1",
#     "concat('price=', cast(price_bin as string)) as a2",
#     "concat('ratings=', cast(ratings_bin as string)) as a3",
#     "concat('reviews=', cast(reviews_bin as string)) as a4",
#     "concat('score=', cast(score_bin as string)) as a5",
#     "concat('releasedDay=', cast(releasedDay_bin as string)) as a6",
#     "concat('maxprice=', cast(maxprice_bin as string)) as a7",
#     "concat('histogram1=', cast(histogram1_bin as string)) as a8",
#     "concat('histogram4=', cast(histogram4_bin as string)) as a9",
#     "concat('histogram5=', cast(histogram5_bin as string)) as a10",
#     "cast(finalCount as string) as finalCount"
#   )
final_df = final_df.selectExpr(
    "concat_ws(';', a1, a2, a3, a4, a5, a6, " +
    "a7, a8, a9, a10,a11,a12) as properties",
    "cast(finalCount as string) as value"
  )

In [None]:
final_df.write.options(header='True', delimiter=',').csv(f"spark_out/")