# Configuration

In [1]:
# Most straightforward method
import pyspark
import pandas as pd

def print_pandas(dataframe, n=5):
    return dataframe.limit(n).toPandas()

In [2]:
spark = pyspark.sql.SparkSession.builder \
        .master('local') \
        .appName('Spark Datatype') \
        .getOrCreate()

# Import data

In [3]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# If datatype is not specified, string will be default
squared = udf(lambda x: x**2)
# squared = udf(lambda x: x**2, FloatType())

In [4]:
df_pd = pd.DataFrame(data=
                     {'integers': [1, 2, 3], 
                      'floats': [-1.0, 0.5, 2.7], 
                      'integer_array': [[1, 2], [3, 4, 5], [6, 7, 8, 9]], 
                      'str_array': [[], ['a'], ['a','b']], 
                      'literal_str_array': "[[], ['a'], ['a','b']]", 
                      'literal_str_array2': '"[[], [a], [a,b]]"', 
                      'strs': ['null', '', None]
                     })

df = spark.createDataFrame(df_pd)
df.printSchema()

root
 |-- integers: long (nullable = true)
 |-- floats: double (nullable = true)
 |-- integer_array: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- str_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- literal_str_array: string (nullable = true)
 |-- literal_str_array2: string (nullable = true)
 |-- strs: string (nullable = true)



In [5]:
print_pandas(df)

Unnamed: 0,integers,floats,integer_array,str_array,literal_str_array,literal_str_array2,strs
0,1,-1.0,"[1, 2]",[],"[[], ['a'], ['a','b']]","""[[], [a], [a,b]]""",
1,2,0.5,"[3, 4, 5]",[a],"[[], ['a'], ['a','b']]","""[[], [a], [a,b]]""",
2,3,2.7,"[6, 7, 8, 9]","[a, b]","[[], ['a'], ['a','b']]","""[[], [a], [a,b]]""",


In [6]:
print_pandas(df.withColumn('floats_sq', squared('floats')).withColumn('integers_sq', squared('integers')))

# df.withColumn('floats_sq', squared('floats'))\
#     .withColumn('integers_sq', squared('integers'))\
#     .printSchema()

Unnamed: 0,integers,floats,integer_array,str_array,literal_str_array,literal_str_array2,strs,floats_sq,integers_sq
0,1,-1.0,"[1, 2]",[],"[[], ['a'], ['a','b']]","""[[], [a], [a,b]]""",,1.0,1
1,2,0.5,"[3, 4, 5]",[a],"[[], ['a'], ['a','b']]","""[[], [a], [a,b]]""",,0.25,4
2,3,2.7,"[6, 7, 8, 9]","[a, b]","[[], ['a'], ['a','b']]","""[[], [a], [a,b]]""",,7.290000000000001,9


In [7]:
print_pandas(df.select(squared('floats').alias('floats_sq'), 
                       squared('integers').alias('integers_sq')))

# df.select(squared('floats').alias('floats_sq'), 
#           squared('integers').alias('integers_sq')).printSchema()

Unnamed: 0,floats_sq,integers_sq
0,1.0,1
1,0.25,4
2,7.290000000000001,9


In [8]:
from pyspark.sql.types import ArrayType, FloatType

squared_list = udf(lambda x: [v**2 for v in x])

# # If you specific FloatType, but v is Integer, it will return None
# squared_list = udf(lambda x: [v**2 for v in x], ArrayType(FloatType()))
# squared_list = udf(lambda x: [float(v)**2 for v in x], ArrayType(FloatType()))

In [9]:
print_pandas(df.withColumn('integer_arrays_sq', squared_list('integer_arrays')))
# df.withColumn('integer_arrays_sq', squared_list('integer_arrays')).printSchema()

AnalysisException: "cannot resolve '`integer_arrays`' given input columns: [floats, str_array, integers, literal_str_array2, strs, integer_array, literal_str_array];;\n'Project [integers#0L, floats#1, integer_array#2, str_array#3, literal_str_array#4, literal_str_array2#5, strs#6, <lambda>('integer_arrays) AS integer_arrays_sq#44]\n+- LogicalRDD [integers#0L, floats#1, integer_array#2, str_array#3, literal_str_array#4, literal_str_array2#5, strs#6], false\n"

In [10]:
for col in df.columns:
    print(col)

integers
floats
integer_array
str_array
literal_str_array
literal_str_array2
strs


In [11]:
for col in df.columns:
    print(type(df[col]))

<class 'pyspark.sql.column.Column'>
<class 'pyspark.sql.column.Column'>
<class 'pyspark.sql.column.Column'>
<class 'pyspark.sql.column.Column'>
<class 'pyspark.sql.column.Column'>
<class 'pyspark.sql.column.Column'>
<class 'pyspark.sql.column.Column'>
