In [1]:
import pyspark.sql.functions as F

### Mounting folders from S3 bucket

In [3]:
def mountBucket(accesskey, secretkey, bucketName, mountFolder):
  ACCESS_KEY_ID = accesskey
  SECRET_ACCESS_KEY = secretkey
  print ("Mounting", bucketName)
  try:
    # Unmount the data in case it was already mounted.
    dbutils.fs.unmount(mountFolder)
  except:
    # If it fails to unmount it most likely wasn't mounted in the first place
    print ("Directory not unmounted: ", mountFolder )
  finally:
    # Lastly, mount our bucket.
    dbutils.fs.mount("s3a://"+ ACCESS_KEY_ID + ":" + SECRET_ACCESS_KEY + "@" + bucketName, mountFolder)
    print ("The bucket", bucketName, "was mounted to", mountFolder, "\n")

# Set AWS programmatic access credentials
access_key_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
secret_access_key = "YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY"

# Mount buckets
mountBucket(access_key_id, secret_access_key, "weclouddata/datasets/telecom/CDR", "/mnt/cdr")

# mountBucket(access_key_id, secret_access_key, "weclouddata/datasets/wikipedia", "/mnt/wikipedia")
# mountBucket(access_key_id, secret_access_key, "weclouddata/datasets/entertainment/movie", "/mnt/movie")
# mountBucket(access_key_id, secret_access_key, "weclouddata/datasets/telecom/anonymous_telecom", "/mnt/anonymous_telecom")
# mountBucket(access_key_id, secret_access_key, "weclouddata/datasets/telecom/churn/china_telecom", "/mnt/china_telecom_churn")
# mountBucket(access_key_id, secret_access_key, "weclouddata/datasets/social/twitter", "/mnt/twitter")
# mountBucket(access_key_id, secret_access_key, "weclouddata/datasets/logs/aws_logs", "/mnt/aws_logs")
# mountBucket(access_key_id, secret_access_key, "weclouddata/datasets/random/shakespeare", "/mnt/shakespeare")
# mountBucket(access_key_id, secret_access_key, "weclouddata/datasets/geo", "/mnt/geo")
# mountBucket(access_key_id, secret_access_key, "weclouddata/datasets/telecom/churn/churn_orange_telecom_sample", "/mnt/orange")
# mountBucket(access_key_id, secret_access_key, "weclouddata/datasets/social/stackoverflow", "/mnt/stackoverflow")



In [4]:
%fs ls '/mnt/cdr'

path,name,size
dbfs:/mnt/cdr/cdr_by_grid_december/,cdr_by_grid_december/,0
dbfs:/mnt/cdr/cdr_by_grid_november/,cdr_by_grid_november/,0
dbfs:/mnt/cdr/cdr_grid_to_grid_2013_december_full.zip,cdr_grid_to_grid_2013_december_full.zip,48988967347
dbfs:/mnt/cdr/cdr_grid_to_grid_2013_november_full.zip,cdr_grid_to_grid_2013_november_full.zip,51059323347
dbfs:/mnt/cdr/mi_meteo_legend.csv,mi_meteo_legend.csv,2295
dbfs:/mnt/cdr/milano-grid.zip,milano-grid.zip,332144
dbfs:/mnt/cdr/milano_weather_station.zip,milano_weather_station.zip,157313
dbfs:/mnt/cdr/zip/,zip/,0


### Accessing mounted data

In [6]:
# display the CDR folder
cdr_files = dbutils.fs.ls("/mnt/cdr")

for fileInfo in cdr_files:
  print(fileInfo.path)


In [7]:
# display the CDR folder
display( cdr_files )

path,name,size
dbfs:/mnt/cdr/cdr_by_grid_december/,cdr_by_grid_december/,0
dbfs:/mnt/cdr/cdr_by_grid_november/,cdr_by_grid_november/,0
dbfs:/mnt/cdr/cdr_grid_to_grid_2013_december_full.zip,cdr_grid_to_grid_2013_december_full.zip,48988967347
dbfs:/mnt/cdr/cdr_grid_to_grid_2013_november_full.zip,cdr_grid_to_grid_2013_november_full.zip,51059323347
dbfs:/mnt/cdr/mi_meteo_legend.csv,mi_meteo_legend.csv,2295
dbfs:/mnt/cdr/milano-grid.zip,milano-grid.zip,332144
dbfs:/mnt/cdr/milano_weather_station.zip,milano_weather_station.zip,157313
dbfs:/mnt/cdr/zip/,zip/,0


In [8]:
# Create a Spark session and Spark context

from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName('Spark tutorial') \
        .getOrCreate()
print('Session created')

sc = spark.sparkContext

#### Manually create spark dataframes

https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.SparkSession.createDataFrame  
https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.toDF

In [10]:
# Create a simple DF 
data = [("jose", 1), ("li", 2), ("liz", 3)]
myDf = spark.createDataFrame(data=data, schema=["name", "age"])

# Create a DF from an RDD
rdd = sc.parallelize([(1,2,3),(4,5,6),(7,8,9)])
myDf = rdd.toDF(["a","b","c"])

#### Create DF from files

https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader

There are two ways of using the methods in DataFrameReader. All the **options** for a method like .csv should be chained before using .csv().  
It will take more time to create a spark-data-frame (SDF) if the option inferSchema is set to be true. If inferSchema is false, all the columns are read as strings.

In [12]:
fName = "/mnt/wikipedia/pageviews/pageviews_by_second.csv"
wikiDF = spark.read.csv(
    path=fName, 
    sep='\t', 
    header='true', 
    inferSchema='false'
)
wikiDF = (spark.read
    .option('sep', '\t')
    .option('header', 'true')
    .option('inferSchema', 'false')
    .csv(fName)
) 

**Use `StructType` and `StructField` to define schema for the DF**  
If there are a large no. of columns, create a csv file with the schema specification (colName, dType, nullable) and create a list of StructField objects by reading the csv file

In [14]:
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, FloatType

cdrFile = '/mnt/cdr/cdr_by_grid_december/sms-call-internet-mi-2013-12-01.txt'
cdrSchema = StructType([
    StructField("square_id", IntegerType(), True),
    StructField("time_interval", LongType(), True),
    StructField("country_code", IntegerType(), True),
    StructField("sms_in_activity", FloatType(), True),
    StructField("sms_out_activity", FloatType(), True),
    StructField("call_in_activity", FloatType(), True),
    StructField("call_out_activity", FloatType(), True),
    StructField("internet_activity", FloatType(), True)]
)
cdr = (spark.read
    .option("header", "true")
    .option("delimiter", "\t")
    .schema(cdrSchema)
    .csv(cdrFile)
)

**Read Parquet files**
* We do not need to specify the schema - the column names and data types are stored in the parquet files.
* Only one job is required to read that schema from the parquet file's metadata.
* Unlike the CSV or JSON readers that have to load the entire file and then infer the schema, the parquet reader can "read" the schema very quickly because it's reading that schema from the metadata.

In [16]:
wikiParquetFiles = "/mnt/wikipedia/pageviews/pageviews_by_second.parquet/"

wikiParquet = (spark.read              
  .parquet(wikiParquetFiles)  
)

# Explicitly specify the schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
wikiSchema = StructType([
  StructField("timestamp", TimestampType(), False),
  StructField("site", StringType(), False),
  StructField("requests", IntegerType(), False)
])
wikiParquet = (spark.read              
  .option("delimiter", "\t")  
  .schema(wikiSchema)          # Use the specified schema
  .parquet(wikiParquetFiles)   # Creates a DataFrame from Parquet after reading in the file
)

**Writing a DF to disk**  
https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter

In [19]:
wikiOut = "/tmp/pageviews_by_second.csv"
(wikiDF.write                       # Our DataFrameWriter
  .option("delimiter", "\t")  
  .option("header", "true")
  .mode("overwrite")               # Replace existing files
  .csv(wikiOut)               # Write DataFrame to csv files
)

wikiOutParquet = "/tmp/pageviews_by_second.parquet"
(wikiDF.write                       # Our DataFrameWriter
  .option("delimiter", "\t")  
  .option("compression", "snappy")
  .mode("overwrite")               # Replace existing files
  .parquet(wikiOutParquet)               # Write DataFrame to csv files
)

# Action operations  
DataFrame Actions either return a result or write to disc.

* Display the first n rows of the DF
* Return a list of first n **Row objects** from a DF
* Return the no of records in a DF

In [21]:
# Display the first n rows of the DF
cdr.show(5, truncate=True) # columns longer than 20 chars are truncated

# Return a list of first n Row objects from a DF
rowsList = cdr.take(5)
print(rowsList)
print()

# Return the no of records in a DF
nRows = cdr.count()
print(nRows)

In [22]:
# Extracting elements of a Row object

from pyspark.sql import Row

a_row = Row(foo=1, bar=True)
a_row.__getattr__("foo")
a_row.__getitem__("foo")
a_row["foo"]

# Transformation operations  
* Create a new DF with top n Rows
* Create a new DF with chosen columns
* Create a new DF after dropping a few columns
* Create a new DF with distinct rows from original sdf
* Create a new DF with rows ordered

In [24]:
# Note the difference between limit() and take(). limit() is a transformation and take() is an action.
cdr.createOrReplaceTempView("cdrTable")

cdr5Rows = cdr.limit(5)
cdr5Rows.show()
cdr5Rows = spark.sql("""
  SELECT * 
  FROM cdrTable 
  LIMIT 5
""")
cdr5Rows.show()


myCols = ["square_id", "country_code", "call_out_activity"]
myColsDF = cdr.select(*myCols)
spark.sql("""
  SELECT square_id,
    country_code, 
    call_out_activity 
  FROM cdrTable
""")
allColsDF = cdr.select(F.col('*'))  # col('*') selects all the columns of DF


droppedColsDF = cdr.drop(*myCols)
# Cannot select all columns except a few, using SQL query.


cdr.select('square_id', 'country_code').distinct().show()
cdr.select('square_id', 'country_code').drop_duplicates().show()


# sort and orderBY both take either a column name or a column object as argument.
cdr.sort('country_code').show()
cdr.sort(F.col('country_code')).show()
cdr.sort('country_code', ascending=False).show()
cdr.orderBy('country_code').show()
cdr.orderBy(F.col('country_code')).show()
cdr.orderBy(F.col('country_code'), ascending=False).show()

**`df.colName` and `df['colName']` only creates column objects.**  
`df.colName.show()` and `df['colName'].show()` will not work

# Create new columns in DataFrames
These are transformations that take a df as input and create another df with derived columns as output.

In [27]:
from pyspark.sql.functions import col, concat, lit

# The following two statements are equivalent
(cdr
 .select(
    F.col('*'), 
    (F.col('sms_in_activity') / F.col('sms_out_activity')).alias('sms_ratio')
  )
 .show(3)
)

(cdr
  .withColumn(
    'sms_ratio', 
    F.col('sms_in_activity') / F.col('sms_out_activity')
  )
  .show(3)
) 

cdr.withColumn('newColId2', F.col('square_id') + 2).show(3)
# note the use of lit()
cdr.withColumn("newCol", F.concat(F.col('square_id'), F.lit('  '), F.col('country_code'))).show(3)

cdr.select(F.col('sms_out_activity').isNull().alias('newCol')).show(3)


# Filtering a DataFrame

`myDF.filter(sql like condition)`

In [29]:
# SQL style condition
(cdr.filter("country_code in (46, 39)").show(5))
(cdr.where("sms_out_activity is null and sms_in_activity is null")
    .show(5)
)

# use col() to create condition
cdr.filter(F.col('sms_in_activity').isNull() & F.col('sms_out_activity').isNull()).show(5)
cdr.where(F.col('sms_in_activity').isNull() & F.col('sms_out_activity').isNull()).show(5)


# Rename columns

In [31]:
# Using select, col and alias
(cdr
  .select(
    "square_id",
    F.col("time_interval").alias("ts"), 
    "country_code"
  )
)

# Using withColumnRenamed()
# all the columns of cdr will be present in the resultant DF along with the renamed column.
(cdr
  .withColumnRenamed("time_interval", "ts")
)

# rename all columns with the same expression
cdr.select(*[F.col(c).alias(c.replace('_', '.')) for c in cdr.columns]).show(5)

# rename multiple columns
# mySDF.select([F.col(oldColName).alias(newColName) for oldColName, newColName in zip(oldColNames, newColNames)])

# A single withColumnRenamed cannot be used to rename multiple column names.
data = sqlContext.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
data = (data
       .withColumnRenamed('x1','x3')
       .withColumnRenamed('x2', 'x4'))

functions module in pyspark
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions

In [33]:
# Example of applying functions on columns of DF

# 1. Convert time_interval string to timestamp type and create a new column that extracts the date from timestamp  
# 2. Extract year, month, and dayofyear

from pyspark.sql.functions import unix_timestamp, to_date, date_format, month, year, dayofyear, dayofweek
from pyspark.sql.types import TimestampType

(cdr.withColumn("ts", (F.col("time_interval")/1000).cast(TimestampType()))
    .withColumn("date1", to_date(F.col("ts")))
    .withColumn("date2", F.date_format(F.col("date1"), 'yyyy/MM/dd'))
).show(5)

(cdr.withColumn("ts", (F.col("time_interval")/1000).cast(TimestampType()))
    .select(F.col("*"), 
            F.year(F.col("ts")).alias("year"),
            F.month(F.col("ts")).alias("month"),
            F.dayofyear(F.col("ts")).alias("dayofyear"),
            F.dayofweek(F.col("ts")).alias("dayofweek"))
).show(5)

# Aggregation

The `groupBy()` function is a **wide** transformation - it will produce a shuffle and conclude a stage boundary.

Unlike all of the other transformations we've seen so far, this transformation does not return a `DataFrame`. In Python it returns `GroupedData`

`groupBy()` supports the following aggregations:

| Method | Description |
|--------|-------------|
| `avg(..)` | Compute the mean value for each numeric columns for each group. |
| `count(..)` | Count the number of rows for each group. |
| `sum(..)` | Compute the sum for each numeric columns for each group. |
| `min(..)` | Compute the min value for each numeric column for each group. |
| `max(..)` | Compute the max value for each numeric columns for each group. |
| `mean(..)` | Compute the average value for each numeric columns for each group. |
| `agg(..)` | Compute aggregates by specifying a series of aggregate columns. |
| `pivot(..)` | Pivots a column of the current DataFrame and perform the specified aggregation. |

Functions:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions

In [35]:
cdr.createOrReplaceTempView("cdrTable")
spark.sql("""
  SELECT square_id,
    MIN(sms_in_activity) AS sms_in_activity_min
  FROM cdrTable
  GROUP BY square_id
""").show(3)

In [36]:
(cdr
  .groupby("square_id")
  .agg(
    F.min(F.col("sms_in_activity")).alias('col1'),
    F.avg(F.col("sms_in_activity")).alias('col2'),
  )
).show(3)

In [37]:
import pyspark.sql.functions as F

agg_cols = ['sms_in_activity', 'sms_out_activity','internet_activity']

(cdr
  .groupby("square_id")
  .agg(
    *[F.min(F.col(c)).alias(c+'_min') for c in agg_cols], 
    *[F.max(F.col(c)).alias(c+'_max')  for c in agg_cols],
  )
).show(3)

In [38]:
# In the above examples we used column objects as arguments to the agrregation functions like min, max... Instead, column names can also be passed as arguments.

df2 = sqlContext.createDataFrame(
  [('a', 'aa', 1, 11), ('a', 'aa', 2, 12), ('a', 'bb', 3, 13), ('a', 'bb', 4, 14), ('b', 'aa', 5, 15), ('b', 'aa', 6, 16), ('b', 'bb', 7, 17), ('b', 'bb', 8, 18)],
  ('col1', 'col2', 'col3', 'col4')
)
df2.show()

(df2.groupBy(['col1', 'col2'])
  .agg(
    F.sum('col3').alias('sumCol3'), 
    F.avg('col4').alias('avgCol4')
  )
  .sort('col1', 'col2')
  .show()
)

# Apply the same aggegation function to all columns of a df
df2.groupBy('col1').sum().show()
# Notice that sum is being done only on 'col3' and 'col4' and not on 'col2' because 'col2' is of type string and not numeric.

# Count the no of rows in each group
(df2.groupBy(['col1', 'col2'])
  .agg(
    F.count('*').alias('nRows')
  )
  .sort('col1', 'col2')
  .show()
)

minColNames = ['col3', 'col4']
maxColNames = ['col3', 'col4']
(df2.groupBy(['col1', 'col2'])
  .agg(
    *[F.min(c).alias('min_' + c) for c in minColNames],
    *[F.max(c).alias('max_' + c) for c in maxColNames]
  )
  .show()
)

# UDFs

These user-defined functions (udfs) operate one-row-at-a-time, and thus suffer from high serialization and invocation overhead. As a result, many data pipelines define UDFs in Java and Scala and then invoke them from Python.  
This section is presented here only for the sake of completeness. **Use scalar pandas udfs instead of these row-at-a-time udfs.**

In [40]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
import string

# example data
df_pd = pd.DataFrame(
    data={'integers': [1, 2, 3],
     'floats': [-1.0, 0.5, 2.7],
     'integer_arrays': [[1, 2], [3, 4, 5], [6, 7, 8, 9]]}
)
df = spark.createDataFrame(df_pd)
df.printSchema()
df.show()

def mySquare(x):
  return x**2

def returnList(x):
  return [float(x), float(x + 1), float(x * x)]

def intToAscii(number):
  return [number, string.ascii_letters[number]]

arraySchema = T.StructType([
    T.StructField('number', T.IntegerType(), nullable=False),
    T.StructField('letters', T.StringType(), nullable=False)
])

# If the return type is not specified while declaring a function as udf, the default datatype of the output will be string.

squareUdfInt = F.udf(mySquare, returnType=T.IntegerType())
UdfReturnArray = F.udf(returnList, returnType=T.ArrayType(T.FloatType()))
UdfReturnStruct = F.udf(intToAscii, returnType=arraySchema)

df.select(
  'integers',
  'floats',
  squareUdfInt('integers').alias('intSquared'),
  squareUdfInt('floats').alias('floatSquared')
).show()
# floatSquared is Null because the udf is defined to return IntegerType but the return value is not IntergerType when 'floats' is the argument to the udf.

# Return type is a composite type. All the elements in the composite type have the same data type (Array).
df.select(
  'integers',
  UdfReturnArray('integers').alias('floatsArray')
).show()


# Return type is composite. Elements of the composite type have different data types.
df.select(
  'integers',
  UdfReturnStruct('integers').alias('struct')
).show()

# Pandas UDFs
## Scalar pandas udfs

https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

They are used for vectorizing scalar operations. To define a scalar Pandas UDF, simply use @pandas_udf to annotate a Python function that takes in `pandas.Series` as arguments and returns another `pandas.Series` of the same size. 

A scalar UDF defines a transformation: **One or more** `pandas.Series` -> A `pandas.Series`. The length of the returned pandas.Series must be of the same as the input pandas.Series.

In [42]:
import pyspark.sql.types as T
import pyspark.sql.functions as F

df = spark.range(0, 10 * 1000 * 1000).withColumn('id', (F.col('id') / 10000).cast('integer')).withColumn('v', F.rand())
df.cache()
df.show(5)
print((df.count()))
print(df.select("id").distinct().count())
df.groupBy('id').agg(F.count(F.col('v'))).show(5)

def plusOne(v):
    return v + 1

def addTwoCols(v1, v2):
    return v1 + v2
  
# Row-at-a-time udf
# Input/output are both a scalar value (type "double" in this example)
plusOneUdf = F.udf(plusOne, returnType=T.DoubleType())
df.withColumn('v + 1', plusOneUdf(df.v)).show(3)

# Scalar Pandas UDF
# Input/output are both a pandas.Series of doubles
plusOnePandasUdf = F.pandas_udf(plusOne, returnType=T.DoubleType(), functionType=F.PandasUDFType.SCALAR )
df.withColumn('v + 1', plusOnePandasUdf(df.v)).show(3)


# Row-at-a-time udf
addTwoColsUDF = F.udf(addTwoCols, returnType=T.DoubleType())
df.withColumn('v + v', addTwoColsUDF(df.v, df.v)).show(3)

# Scalar Pandas UDF
addTwoColsPUDF = F.pandas_udf(addTwoCols, returnType=T.DoubleType(), functionType=F.PandasUDFType.SCALAR )
df.withColumn("v + v", addTwoColsPUDF(F.col('v'), F.col('v'))).show(3)

The examples above define a row-at-a-time UDF "plusOneUdf" and a scalar Pandas UDF "plusOnePandasUdf" that performs the same “plus one” computation.  

**In the row-at-a-time version, the user-defined function takes a double “v” and returns the result of “v + 1” as a double. In the Pandas version, the user-defined function takes a `pandas.Series` “v” and returns the result of “v + 1” as a `pandas.Series`. Because “v + 1” is vectorized on `pandas.Series`, the Pandas version is much faster than the row-at-a-time version.**

Note that there are two important requirements when using scalar pandas UDFs:

* The input and output series must have the same size.
* How a column is split into multiple `pandas.Series` is internal to Spark, and therefore the result of user-defined function must be independent of the splitting.

Decorators can be used with python functions instead of declaring user-defined functions:  
```
@F.udf('double')
def plusOne(v):
    return v + 1

@F.pandas_udf("double", F.PandasUDFType.SCALAR)
def pandasPlusOne(v):
    return v + 1
```

Default value of the argument `functionType` for the function `pandas_udf` is `PandasUDFType.SCALAR`.

`MapType`, `StructType` are currently not supported as output types.

## Grouped Map pandas udf

Grouped map Pandas UDFs first splits a Spark DataFrame into groups based on the conditions specified in the groupby operator, applies a user-defined function (`pandas.DataFrame` -> `pandas.DataFrame`) to each group, combines and returns the results as a new Spark DataFrame.  
A grouped map UDF defines transformation: A `pandas.DataFrame` -> A `pandas.DataFrame`.  The returnType should be a `StructType` describing the schema of the returned `pandas.DataFrame`. 

The column labels of the returned `pandas.DataFrame` must either match the field names in the defined returnType schema if specified as strings, or match the field data types by position if not strings, e.g. integer indices. 

The length of the returned pandas.DataFrame can be arbitrary.

In [45]:
# Input/output are both a pandas.DataFrame
def subtractMean(pdf):
    return pdf.assign(v=pdf.v - pdf.v.mean())

subtractMeanPandasUdf = F.pandas_udf(subtractMean, returnType=df.schema, functionType=F.PandasUDFType.GROUPED_MAP)
df.groupby('id').apply(subtractMeanPandasUdf).show(3)


print(df.count())

df.groupby('id').apply(subtractMeanPandasUdf).count()

Alternatively, the user can define a function that takes two arguments. In this case, the grouping key(s) will be passed as the first argument and the data will be passed as the second argument. The grouping key(s) will be passed as a tuple (tuple because grouping can be done on more than one column/key) of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in as a `pandas.DataFrame` containing all columns from the original Spark DataFrame. This is useful when the user does not want to hardcode grouping key(s) in the function.

In [47]:
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v")
)
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)  # doctest: +SKIP
def mean_udf(key, pdf):
    # key is a tuple of one numpy.int64, which is the value
    # of 'id' for the current group
    return pd.DataFrame([key + (pdf.v.mean(),)])
  
df.groupby('id').apply(mean_udf).show()
# column names of the output dataFrame are determined by the schema mentioned in the decorator

In the above code note that:
```
>>> (1,) + (2,)
(1, 2)
>>> [(1,) + (2,)]
[(1, 2)]
>>> pd.DataFrame([(1, 2), (3, 4)])
   0  1
0  1  2
1  3  4
```

In [49]:
@pandas_udf(
  "id long, `ceil(v / 2)` long, v double", 
  PandasUDFType.GROUPED_MAP
)
def sum_udf(key, pdf):
  # key is a tuple of two numpy.int64s, which is the values
  # of 'id' and 'ceil(df.v / 2)' for the current group
  return pd.DataFrame([key + (pdf.v.sum(),)])

df.groupby(df.id, F.ceil(df.v / 2)).apply(sum_udf).show()

In the above code, since id is `[1, 1, 2, 2, 3]` and ceil of v / 2 is `[1, 1, 2, 3, 5]` and grouping is on `[id, ceil(v/2)]` the keys are `[(1, 1), (2, 2), (2, 3), (3, 5)]`

## Grouped Aggregate pandas udf
A grouped aggregate UDF defines a transformation: **One or more `pandas.Series` -> A `scalar`**. The returned scalar can be either a python primitive type, e.g., int or float or a numpy data type, e.g., numpy.int64 or numpy.float64.

`MapType` and `StructType` are currently not supported as output types.

Grouped aggregate UDFs are used with `pyspark.sql.GroupedData.agg()` and `pyspark.sql.Window`

In [52]:
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def mean_udf(v):
    return v.mean()
df.groupby("id").agg(mean_udf(df['v'])).show()

## Grouped map pandas_udf as substitute for pandas transform

In [54]:
dfSchema = T.StructType([
  T.StructField('col1', T.StringType(), True),
  T.StructField('col2', T.FloatType(), True),
  T.StructField('col3', T.FloatType(), True)
])

df = spark.createDataFrame(data=[('a', 2.0, 3.0), ('a', 5.0, 6.0), ('b', 8.0, 9.0), ('b', 1.0, 3.0)], schema=dfSchema)
df.show()

to_append = [
  T.StructField("col2_to_mean_col1", T.FloatType(), True),
  T.StructField("col3_to_mean_col1", T.FloatType(), True),
  T.StructField("col2_to_std_col1", T.FloatType(), True),
  T.StructField("col3_to_std_col1", T.FloatType(), True),
] 
newDfSchema = T.StructType(df.schema.fields + to_append)

@F.pandas_udf(newDfSchema, F.PandasUDFType.GROUPED_MAP)
def transformMean(df):
  df['col2_to_mean_col1'] = df['col2'] / df['col2'].mean()
  df['col3_to_mean_col1'] = df['col3'] / df['col3'].mean()
  df['col2_to_std_col1'] = df['col2'] / df['col2'].std()
  df['col3_to_std_col1'] = df['col3'] / df['col3'].std()
  return df

df.groupBy('col1').apply(transformMean).show()

### Convert a column in DataFrame into a python list

In [56]:
dfSchema = T.StructType([
  T.StructField('col1', T.StringType(), True),
  T.StructField('col2', T.FloatType(), True),
  T.StructField('col3', T.FloatType(), True)
])

df = spark.createDataFrame(data=[('a', 2.0, 3.0), ('a', 5.0, 6.0), ('b', 8.0, 9.0), ('b', 1.0, 3.0)], schema=dfSchema)
df.show()

column_as_list  = df.select('col2').rdd.flatMap(lambda x: x).collect()
column_as_list

### Calculate the no. of missing values

In [58]:

def countNullNan(colName):
  return F.count(F.when((F.isnull(colName) | F.isnan(colName)), 'missingValue')).alias(colName)

df.select(*[countNullNan(colName) for colName in df.columns]).show()