In [32]:
r_file = sc.textFile("file:///home/victorbdm/assignment_data/*.gz")

In [33]:
r_file = r_file.map(lambda x: x.split(','))

In [34]:
def remove_quotation(x):
    return([xx.replace('"', '') for xx in x])
r_file = r_file.map(remove_quotation)

#### Removing of NAs

In [35]:
r_file = r_file.filter(lambda x:'NA' not in x)

##### Loading Spark dataframe

In [36]:
r_fileDF = spark.createDataFrame(data = r_file.filter(lambda x:x[0]!= 'date'),
                                  schema = r_file.filter(lambda x:x[0]=='date').collect()[0])

#### Importing  libraries and variable conversion

In [37]:
from pyspark.sql.types import DateType, IntegerType

In [38]:
r_fileDF = r_fileDF.withColumn("date", r_fileDF["date"].cast(DateType()))
r_fileDF = r_fileDF.withColumn("size", r_fileDF["size"].cast(IntegerType()))
r_fileDF = r_fileDF.withColumn("ip_id", r_fileDF["ip_id"].cast(IntegerType()))

In [39]:
r_fileDF.createOrReplaceTempView('packages')

In [40]:
r_fileDF.printSchema()

root
 |-- date: date (nullable = true)
 |-- time: string (nullable = true)
 |-- size: integer (nullable = true)
 |-- r_version: string (nullable = true)
 |-- r_arch: string (nullable = true)
 |-- r_os: string (nullable = true)
 |-- package: string (nullable = true)
 |-- version: string (nullable = true)
 |-- country: string (nullable = true)
 |-- ip_id: integer (nullable = true)



#### 1. Total number of download for ggplot and dplyr

In [41]:
gd_package = spark.sql('SELECT package, COUNT(package) AS package_count FROM packages WHERE package in ("ggplot2", "dplyr") GROUP BY package ORDER BY package_count')
gd_package.show()

+-------+-------------+
|package|package_count|
+-------+-------------+
|  dplyr|        13369|
|ggplot2|        39295|
+-------+-------------+



In [42]:
gd_package.select("package", "package_count")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="gd_package", keyspace="assignment2")\
.save(mode="append")

#### 2. Total download by each operating system

In [43]:
os_package = spark.sql('SELECT r_os, COUNT(r_os) AS os_count FROM packages GROUP BY r_os ORDER BY os_count DESC')
os_package.show()

+---------------+--------+
|           r_os|os_count|
+---------------+--------+
|        mingw32| 1111764|
|      linux-gnu|  519725|
|     darwin17.0|  364260|
|       darwin20|   43771|
|   darwin15.6.0|   25604|
|   darwin13.4.0|    5675|
|   darwin20.6.0|    3178|
|   darwin20.4.0|    1434|
|     linux-musl|    1040|
|   darwin19.6.0|     708|
|   darwin21.1.0|     329|
|linux-gnueabihf|     301|
|   darwin20.5.0|      85|
|   darwin20.3.0|      83|
|   darwin19.5.0|      64|
|   darwin20.2.0|      60|
|   darwin18.7.0|      42|
|   darwin20.1.0|      31|
|   darwin11.4.2|      20|
|   darwin19.2.0|       6|
+---------------+--------+
only showing top 20 rows



In [44]:
os_package.select("r_os", "os_count")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="os_package", keyspace="assignment2")\
.save(mode="append")

#### 3. Top 10 distinct large package

In [45]:
top_package = spark.sql('SELECT DISTINCT package, MAX(size) AS package_size FROM packages GROUP BY package ORDER BY package_size DESC ')
top_package.show(10)

+---------+------------+
|  package|package_size|
+---------+------------+
|      h2o|   178032483|
|gdalcubes|   113334894|
|    terra|   112345795|
|       sf|   106864613|
|    rgdal|   104486593|
|   vapour|   101826642|
|     apcf|    98560474|
|     Boom|    84741330|
|   mlpack|    60423534|
|    torch|    49660349|
+---------+------------+
only showing top 10 rows



In [46]:
top_package.select("package", "package_size")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="top_package", keyspace="assignment2")\
.save(mode="append")

#### 4. Top 10 least popular package

In [47]:
least_package = spark.sql('SELECT DISTINCT package, COUNT(package) AS package_count FROM packages GROUP BY package ORDER BY package_count ASC')
least_package.show(10)

+------------+-------------+
|     package|package_count|
+------------+-------------+
|      cuml4r|            1|
|     lazyrmd|            2|
| c2d4u.tools|            2|
|       CommT|            2|
|        exif|            2|
|    idmTPreg|            2|
|heatmap.plus|            2|
| packagedocs|            2|
|   gitgadget|            2|
|        bspm|            2|
+------------+-------------+
only showing top 10 rows



In [48]:
least_package.select("package", "package_count")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="least_package", keyspace="assignment2")\
.save(mode="append")

#### 5. Most downloading hour

In [49]:
download_time = spark.sql('SELECT time, COUNT(time) AS time_count FROM packages GROUP BY time ORDER BY time_count DESC')
download_time.show(1)

+--------+----------+
|    time|time_count|
+--------+----------+
|04:47:56|       205|
+--------+----------+
only showing top 1 row



In [50]:
download_time.select("time", "time_count")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="download_time", keyspace="assignment2")\
.save(mode="append")

#### 6. Top 5 package in US

In [51]:
us_package = spark.sql('SELECT country, package, COUNT(package) AS package_count FROM packages WHERE country= "US" GROUP BY package, country ORDER BY package_count DESC')
us_package.show(5)

+-------+--------+-------------+
|country| package|package_count|
+-------+--------+-------------+
|     US|  crayon|         7956|
|     US|   rlang|         7604|
|     US|     cli|         6979|
|     US|   vctrs|         6408|
|     US|ellipsis|         6389|
+-------+--------+-------------+
only showing top 5 rows



In [52]:
us_package.select("country", "package", "package_count")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="us_package", keyspace="assignment2")\
.save(mode="append")

#### 7. Packages downloaded by the machine with higher download

In [53]:
machine_package = spark.sql('SELECT package, count(package) AS package_download FROM packages WHERE r_arch = (SELECT max(r_arch) FROM packages) GROUP BY package order by package_download desc')
machine_package.show()

+-----------+----------------+
|    package|package_download|
+-----------+----------------+
|       ragg|           50700|
|textshaping|           50306|
|    ggplot2|           38809|
|   devtools|           28498|
|      Hmisc|           28216|
|         sf|           26546|
|      units|           26120|
|      rgeos|           25525|
|    pkgdown|           25279|
|        cli|           17403|
|      rlang|           17321|
|     crayon|           15115|
|  lifecycle|           13301|
|     pillar|           13234|
|     tibble|           13220|
|      dplyr|           12950|
|      vctrs|           12891|
|   ellipsis|           12745|
|       glue|           11844|
|   magrittr|           11761|
+-----------+----------------+
only showing top 20 rows



In [54]:
machine_package.select("package", "package_download")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="machine_package", keyspace="assignment2")\
.save(mode="append")

#### 8. Top 3 OS used by R programmers

In [55]:
popular_os = spark.sql('SELECT r_os, COUNT(r_os) AS os_count FROM packages GROUP BY r_os ORDER BY os_count DESC')
popular_os.show(3)

+----------+--------+
|      r_os|os_count|
+----------+--------+
|   mingw32| 1111764|
| linux-gnu|  519725|
|darwin17.0|  364260|
+----------+--------+
only showing top 3 rows



In [56]:
popular_os.select("r_os", "os_count")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="popular_os", keyspace="assignment2")\
.save(mode="append")

#### 9. R user that uses 32 bit 

In [57]:
r_32bit = spark.sql('SELECT r_arch, COUNT(r_arch) AS machine_count FROM packages WHERE r_arch="i386" GROUP BY r_arch ORDER BY machine_count DESC')
r_32bit.show(3)

+------+-------------+
|r_arch|machine_count|
+------+-------------+
|  i386|        27317|
+------+-------------+



In [58]:
r_32bit.select("r_arch", "machine_count")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="r_32bit", keyspace="assignment2")\
.save(mode="append")

#### 10. Total download by each country

In [59]:
country = spark.sql('SELECT DISTINCT country, COUNT(country) AS country_count FROM packages GROUP BY country ORDER BY country_count ASC')
country.show()

+-------+-------------+
|country|country_count|
+-------+-------------+
|     PW|            1|
|     AD|            1|
|     TC|            1|
|     SR|            2|
|     CV|            3|
|     AP|            3|
|     LR|            3|
|     LI|            3|
|     GA|            3|
|     MC|            3|
|     SB|            4|
|     MD|            5|
|     BM|            6|
|     YE|            6|
|     RE|            6|
|     KG|            7|
|     BB|            8|
|     SO|            8|
|     DM|            8|
|     A2|           18|
+-------+-------------+
only showing top 20 rows



In [60]:
country.select("country", "country_count")\
.write.format("org.apache.spark.sql.cassandra")\
.options(table="country", keyspace="assignment2")\
.save(mode="append")