<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#count-distinct" data-toc-modified-id="count-distinct-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>count distinct</a></span></li><li><span><a href="#Get-distinct-column-values" data-toc-modified-id="Get-distinct-column-values-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Get distinct column values</a></span></li><li><span><a href="#Group-by-one-column-and-filter-rows-with-maximum" data-toc-modified-id="Group-by-one-column-and-filter-rows-with-maximum-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Group by one column and filter rows with maximum</a></span><ul class="toc-item"><li><span><a href="#using-leftsemi-join" data-toc-modified-id="using-leftsemi-join-3.1"><span class="toc-item-num">3.1&nbsp;&nbsp;</span>using leftsemi join</a></span></li><li><span><a href="#using-sql" data-toc-modified-id="using-sql-3.2"><span class="toc-item-num">3.2&nbsp;&nbsp;</span>using sql</a></span></li><li><span><a href="#using-pandas" data-toc-modified-id="using-pandas-3.3"><span class="toc-item-num">3.3&nbsp;&nbsp;</span>using pandas</a></span></li></ul></li><li><span><a href="#Get-average-after-filtering" data-toc-modified-id="Get-average-after-filtering-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Get average after filtering</a></span></li></ul></div>

In [1]:
import numpy as np
import pandas as pd
import pyspark
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf # @udf("integer") def myfunc(x,y): return x - y
from pyspark.sql import functions as F # stddev format_number date_format, dayofyear, when
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

print([(x.__name__,x.__version__) for x in [np, pd, pyspark]])

spark = pyspark.sql.SparkSession.builder.appName('example').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
sc.setLogLevel("INFO")

[('numpy', '1.17.1'), ('pandas', '0.25.1'), ('pyspark', '2.4.4')]


# count distinct

In [34]:
data = [("2001","id1"),("2002","id1"),
        ("2002","id1"),("2001","id1"),
        ("2001","id2"),("2001","id2"),
        ("2002","id2"),("2003","id1")]

df = spark.createDataFrame(data,["year","id"])
df.show()

+----+---+
|year| id|
+----+---+
|2001|id1|
|2002|id1|
|2002|id1|
|2001|id1|
|2001|id2|
|2001|id2|
|2002|id2|
|2003|id1|
+----+---+



In [24]:
# Note that countDistinct does not count Null as a distinct value! 
df.groupBy("year").agg(F.countDistinct("id").alias('count')).show()

+----+-----+
|year|count|
+----+-----+
|2002|    2|
|2001|    2|
|2003|    1|
+----+-----+



In [32]:
df.count() # total number of rows

8

In [33]:
df.distinct().count()

5

In [23]:
df.select(F.countDistinct('year')).show()

+--------------------+
|count(DISTINCT year)|
+--------------------+
|                   3|
+--------------------+



In [8]:
dfp = pd.DataFrame(data,columns=['year','id'])
dfp

Unnamed: 0,year,id
0,2001,id1
1,2002,id1
2,2002,id1
3,2001,id1
4,2001,id2
5,2001,id2
6,2002,id2
7,2003,id1


In [9]:
dfp.groupby(['year'])['id'].count().reset_index()

Unnamed: 0,year,id
0,2001,4
1,2002,3
2,2003,1


In [10]:
dfp.groupby('year').agg({'id': 'count'})

Unnamed: 0_level_0,id
year,Unnamed: 1_level_1
2001,4
2002,3
2003,1


# Get distinct column values

In [27]:
x = df.select('year').distinct()
x.show()

+----+
|year|
+----+
|2002|
|2001|
|2003|
+----+



In [28]:
x.collect()

[Row(year='2002'), Row(year='2001'), Row(year='2003')]

In [30]:
x.dropDuplicates(['year']).show()

+----+
|year|
+----+
|2002|
|2001|
|2003|
+----+



# Group by one column and filter rows with maximum

In [37]:
data = [
    ('a', 5, 'c'),
    ('a', 8, 'd'),
    ('a', 7, 'e'),
    ('b', 1, 'f'),
    ('b', 3, 'g')
]

df = spark.createDataFrame(data, ['A','B','C'])
df.show()

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  a|  5|  c|
|  a|  8|  d|
|  a|  7|  e|
|  b|  1|  f|
|  b|  3|  g|
+---+---+---+



In [40]:
df.groupBy('A').agg({'B': 'max'}).show()

+---+------+
|  A|max(B)|
+---+------+
|  b|     3|
|  a|     8|
+---+------+



In [39]:
df.groupBy('A').agg(F.max('B')).show()

+---+------+
|  A|max(B)|
+---+------+
|  b|     3|
|  a|     8|
+---+------+



## using leftsemi join

In [41]:
# we also want to get column with C column

In [43]:
df.join(df.groupBy('A').agg(F.max('B').alias('B')),on='B',how='leftsemi').show()

+---+---+---+
|  B|  A|  C|
+---+---+---+
|  3|  b|  g|
|  8|  a|  d|
+---+---+---+



## using sql

In [44]:
df.createOrReplaceTempView("table") # this is sql temporary view

In [46]:
q = '''SELECT *
FROM table a LEFT SEMI
JOIN (
    SELECT 
        A,
        max(B) as max_B
    FROM table
    GROUP BY A
    ) t
ON a.A=t.A AND a.B=t.max_B
'''

results = spark.sql(q)
results.show()

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  b|  3|  g|
|  a|  8|  d|
+---+---+---+



## using pandas

In [49]:
dfp = df.toPandas()
dfp

Unnamed: 0,A,B,C
0,a,5,c
1,a,8,d
2,a,7,e
3,b,1,f
4,b,3,g


In [52]:
pd.merge(dfp.groupby('A')['B'].max(), dfp)

Unnamed: 0,B,A,C
0,8,a,d
1,3,b,g


In [53]:
dfp.groupby('A')['B'].max().to_frame().merge(dfp)

Unnamed: 0,B,A,C
0,8,a,d
1,3,b,g


In [58]:
dfp.groupby('A',as_index=False)['B'].max().merge(dfp)

Unnamed: 0,A,B,C
0,a,8,d
1,b,3,g


# Get average after filtering

In [61]:
df.show()

+---+---+---+
|  A|  B|  C|
+---+---+---+
|  a|  5|  c|
|  a|  8|  d|
|  a|  7|  e|
|  b|  1|  f|
|  b|  3|  g|
+---+---+---+



In [65]:
df = df.withColumn('age', df.B )
df.show()

+---+---+---+---+
|  A|  B|  C|age|
+---+---+---+---+
|  a|  5|  c|  5|
|  a|  8|  d|  8|
|  a|  7|  e|  7|
|  b|  1|  f|  1|
|  b|  3|  g|  3|
+---+---+---+---+



In [66]:
df.filter(df['B'] > 5).agg({'age': 'avg'}).show()

+--------+
|avg(age)|
+--------+
|     7.5|
+--------+



In [68]:
df.filter(df['B'] > 5).agg(F.avg(F.col('age'))).show()

+--------+
|avg(age)|
+--------+
|     7.5|
+--------+



In [69]:
from pyspark.sql.functions import avg, col, when

In [72]:
df.select(
    avg(
        when(df['B'] > 5, df['age'])
    )).show()

+-----------------------------------+
|avg(CASE WHEN (B > 5) THEN age END)|
+-----------------------------------+
|                                7.5|
+-----------------------------------+

