In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext



ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262


In [None]:
collection = [1,2,3,4,"five", ("six", 6), {"seven": 7}]
collection_rdd = sc.parallelize(collection)

print(collection_rdd)

In [10]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

fractions = [[x,y] for x in range(100) for y in range(1,100)]
frac_df = spark.createDataFrame(fractions, ['numerator', 'denominator'])

frac_df = frac_df.select(F.array(F.col('numerator'), F.col('denominator')).alias('fraction'))
frac_df.show(5, False)

+--------+
|fraction|
+--------+
|[0, 1]  |
|[0, 2]  |
|[0, 3]  |
|[0, 4]  |
|[0, 5]  |
+--------+
only showing top 5 rows



In [13]:
from fractions import Fraction
from typing import Tuple, Optional

Frac = Tuple[int, int]

def py_reduce_fraction(frac: Frac) -> Optional[Frac]:
    num, denom = frac
    if denom:
        answer = Fraction(num, denom)
        return answer.numerator, answer.denominator
    return None

assert py_reduce_fraction((3,6)) == (1,2)
assert py_reduce_fraction((1,0)) is None

def py_fraction_to_float(frac: Frac) -> Optional[float]:
    num, denom = frac
    if denom:
        return num / denom
    return None

assert py_fraction_to_float((2,8)) == 0.25
assert py_fraction_to_float((10,0)) is None

SparkFrac = T.ArrayType(T.LongType())

reduce_fraction = F.udf(py_reduce_fraction, SparkFrac)

frac_df = frac_df.withColumn('reduced_fraction', reduce_fraction(F.col('fraction')))
frac_df.show(5, False)

+--------+----------------+
|fraction|reduced_fraction|
+--------+----------------+
|[0, 1]  |[0, 1]          |
|[0, 2]  |[0, 1]          |
|[0, 3]  |[0, 1]          |
|[0, 4]  |[0, 1]          |
|[0, 5]  |[0, 1]          |
+--------+----------------+
only showing top 5 rows

