# Analyzing R package download frequency
Based on https://github.com/XD-DENG/Spark-practice, this notebook is licensed under [CC BY-NC-SA 4.0](https://creativecommons.org/licenses/by-nc-sa/4.0/).

In [35]:
import os
from pyspark.context import SparkContext

os.environ["PYTHONHASHSEED"] = "0"

## Download data

In [21]:
!wget http://cran-logs.rstudio.com/2023/2023-11-12.csv.gz

--2023-11-13 21:44:58--  http://cran-logs.rstudio.com/2023/2023-11-12.csv.gz
Resolving cran-logs.rstudio.com (cran-logs.rstudio.com)... 52.92.180.187, 52.218.242.178, 52.218.178.98, ...
Connecting to cran-logs.rstudio.com (cran-logs.rstudio.com)|52.92.180.187|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 46446548 (44M) [application/gzip]
Saving to: ‘2023-11-12.csv.gz’


2023-11-13 21:45:42 (1.03 MB/s) - ‘2023-11-12.csv.gz’ saved [46446548/46446548]



In [3]:
sc = SparkContext('local', 'r-packages')

## Import data into an Resilient Distributed Dataset (RDD)

In [22]:
# A Spark Context "sc" is already made for us by Databricks.
# Note that we don't have to unzip the data file, `sc.textFile` does that for us.
# `sc.textFile` produces an RDD.

raw_content = sc.textFile("./2023-11-12.csv.gz")
type(raw_content)

pyspark.rdd.RDD

## Data overview

In [23]:
# Let's have a look at a few lines.
raw_content.take(5)

['"date","time","size","r_version","r_arch","r_os","package","version","country","ip_id"',
 '"2023-11-12","10:49:26",315371,"4.3.2","x86_64","mingw32","gower","1.0.1","AU",1',
 '"2023-11-12","10:49:24",2563987,"4.3.2","x86_64","mingw32","lava","1.7.3","AU",1',
 '"2023-11-12","10:49:21",677249,"4.3.2","x86_64","mingw32","future","1.33.0","AU",1',
 '"2023-11-12","10:49:26",1106170,"4.2.1","x86_64","mingw32","jsonlite","1.8.7","CN",2']

In [24]:
# How many data lines do we have in total?
raw_content.count()

3523030

## Intermezzo 1: Set Operations

In [12]:
# Combine two sets, for the case of the same set it results in twice the rows. You can compare that to the `count()` output above.
raw_content.union(raw_content).count()

28666

In [13]:
# Show only rows that are the same in both sets. Note that this is less than `raw_content.count()` due to duplicate lines.
raw_content.intersection(raw_content).count()

14327

In [14]:
# The above intersection with itself is the same as `distinct()` which shows only unique rows.
raw_content.distinct().count()

14327

## Data cleaning

In [25]:
# Turn the string lines from raw_content into an array.
content = raw_content.map(lambda x: x.split(','))

In [26]:
# The lines are split now.
content.take(5)

[['"date"',
  '"time"',
  '"size"',
  '"r_version"',
  '"r_arch"',
  '"r_os"',
  '"package"',
  '"version"',
  '"country"',
  '"ip_id"'],
 ['"2023-11-12"',
  '"10:49:26"',
  '315371',
  '"4.3.2"',
  '"x86_64"',
  '"mingw32"',
  '"gower"',
  '"1.0.1"',
  '"AU"',
  '1'],
 ['"2023-11-12"',
  '"10:49:24"',
  '2563987',
  '"4.3.2"',
  '"x86_64"',
  '"mingw32"',
  '"lava"',
  '"1.7.3"',
  '"AU"',
  '1'],
 ['"2023-11-12"',
  '"10:49:21"',
  '677249',
  '"4.3.2"',
  '"x86_64"',
  '"mingw32"',
  '"future"',
  '"1.33.0"',
  '"AU"',
  '1'],
 ['"2023-11-12"',
  '"10:49:26"',
  '1106170',
  '"4.2.1"',
  '"x86_64"',
  '"mingw32"',
  '"jsonlite"',
  '"1.8.7"',
  '"CN"',
  '2']]

In [27]:
def clean(line):
    """Remove quotation marks " from data entries."""
    return([item.replace('"', '') for item in line])

content = content.map(clean)

# Now it looks much better.
content.take(5)

[['date',
  'time',
  'size',
  'r_version',
  'r_arch',
  'r_os',
  'package',
  'version',
  'country',
  'ip_id'],
 ['2023-11-12',
  '10:49:26',
  '315371',
  '4.3.2',
  'x86_64',
  'mingw32',
  'gower',
  '1.0.1',
  'AU',
  '1'],
 ['2023-11-12',
  '10:49:24',
  '2563987',
  '4.3.2',
  'x86_64',
  'mingw32',
  'lava',
  '1.7.3',
  'AU',
  '1'],
 ['2023-11-12',
  '10:49:21',
  '677249',
  '4.3.2',
  'x86_64',
  'mingw32',
  'future',
  '1.33.0',
  'AU',
  '1'],
 ['2023-11-12',
  '10:49:26',
  '1106170',
  '4.2.1',
  'x86_64',
  'mingw32',
  'jsonlite',
  '1.8.7',
  'CN',
  '2']]

## Finding and Counting

In [28]:
# How often was 'ggplot2' downloaded?
ggplot2 = content.filter(lambda x: x[6] == 'ggplot2').count()
ggplot2

78975

In [29]:
# You can filter for multiple fields at the same time.
ggplot2_nl = content.filter(lambda x: x[6] == 'ggplot2' and x[8] == 'NL')
ggplot2_nl

PythonRDD[35] at RDD at PythonRDD.scala:53

In [30]:
# Let's look at the filtered rows themselves.
ggplot2_nl.take(2)

[['2023-11-12',
  '10:51:53',
  '4300000',
  '4.3.2',
  'x86_64',
  'mingw32',
  'ggplot2',
  '3.4.4',
  'NL',
  '280'],
 ['2023-11-12',
  '22:59:49',
  '3163440',
  '4.3.1',
  'x86_64',
  'linux-gnu',
  'ggplot2',
  '3.4.4',
  'NL',
  '600']]

## Reduce
Filtering works nicely, but let's aggregate and count _all_ packages instead.

In [31]:
# A common way to "reduce" in Spark: Tie a field containing "1" to all lines and sum these fields per key.
package_count = content.map(lambda x: (x[6], 1)).reduceByKey(lambda a,b: a+b)
type(package_count)

pyspark.rdd.PipelinedRDD

In [32]:
# This is how the aggregate looks like now.
package_count.take(5)

[('package', 1),
 ('gower', 4776),
 ('lava', 5720),
 ('future', 6171),
 ('jsonlite', 19287)]

In [36]:
# We can still query the count of single packages.
package_count.lookup("XML")

[4083]

## Intermezzo 2: Joins
See https://javarevisited.blogspot.com/2013/05/difference-between-left-and-right-outer-join-sql-mysql.html for details about join flavours.

In [37]:
# Let's split out the country to use as a key.
content_modified=content.map(lambda x:(x[8], x))
content_modified.take(2)

[('country',
  ['date',
   'time',
   'size',
   'r_version',
   'r_arch',
   'r_os',
   'package',
   'version',
   'country',
   'ip_id']),
 ('AU',
  ['2023-11-12',
   '10:49:26',
   '315371',
   '4.3.2',
   'x86_64',
   'mingw32',
   'gower',
   '1.0.1',
   'AU',
   '1'])]

In [38]:
# Your mapping list can be as long as you like. As an example, we take two countries. 
mapping=[('NL', 'Netherlands'), ('US', 'United States')]

In [39]:
# Distribute the mapping to form an RDD.
mapping_rdd=sc.parallelize(mapping)

In [40]:
# For comparison with below commands: How many lines does `content_modified` have?
content_modified.count()

3523030

In [41]:
# Performing an "inner join" operation. This is _shorter_ than `content_modified`, since an "inner join" only produces lines that share a key in both `content_modified` and `mapping_rdd`.
join = content_modified.join(mapping_rdd)
join.count()

1048923

In [None]:
# This is what the data looks like. Notice that _all_ entries have matched with mapping_rdd and contain the country name as last field.
join.takeSample(withReplacement=False, num=2)

In [None]:
# Performing a "left outer join" operation. This is exactly as long as `content_modified`, since the key only has to appear in the "left" (`content_modified`) set, and not necessarily in `mapping_rdd`.
left_outer_join = content_modified.leftOuterJoin(mapping_rdd)
left_outer_join.count()

In [None]:
# Notice that not all lines have matched with `mapping_rdd`. These lines now contain `None` as last item.
left_outer_join.takeSample(False, 8)

## Sorting

In [None]:
# Let's create a Top 10 list of downloaded R packages.
top_10 = package_count.map(lambda x: (x[1], x[0])).sortByKey(0).take(10)
top_10

## Plotting the Top 10

In [None]:
# We can reproduce the above plot by hand, it's not very difficult.
from matplotlib import pyplot as plt

In [None]:
data = list(zip(*top_10))  # Transpose the list.
data.reverse()  # Switch columns, now they are in the right order for matplotlib.
data

In [None]:
plt.bar(*data)
plt.xticks(rotation=-30)
plt.xlabel("Package name")
plt.ylabel("Downloads per day")

## Bonus

- What are the 10 most downloaded packages last month?
- What are the 10 most downloaded packages last year?

You can find all data here: http://cran-logs.rstudio.com/