In [1]:
# Use the Azure Machine Learning data source package
from azureml.dataprep import datasource

# Use the Azure Machine Learning data collector to log various metrics
from azureml.logging import get_azureml_logger
logger = get_azureml_logger()


In [2]:
# Use Azure Machine Learning history magic to control history collection
# History is off by default, options are "on", "off", or "show"
# %azureml history on


In [3]:
# This call will load the referenced data source and return a DataFrame.
# If run in a PySpark environment, this call returns a
# Spark DataFrame. If not, it returns a Pandas DataFrame.
df = datasource.load_datasource('ratings.dsource')

# Remove this line and add code that uses the DataFrame
df.head(10)


[Row(userId=1.0, movieId=31.0, rating=2.5, timestamp=1260759144.0),
 Row(userId=1.0, movieId=1029.0, rating=3.0, timestamp=1260759179.0),
 Row(userId=1.0, movieId=1061.0, rating=3.0, timestamp=1260759182.0),
 Row(userId=1.0, movieId=1129.0, rating=2.0, timestamp=1260759185.0),
 Row(userId=1.0, movieId=1172.0, rating=4.0, timestamp=1260759205.0),
 Row(userId=1.0, movieId=1263.0, rating=2.0, timestamp=1260759151.0),
 Row(userId=1.0, movieId=1287.0, rating=2.0, timestamp=1260759187.0),
 Row(userId=1.0, movieId=1293.0, rating=2.0, timestamp=1260759148.0),
 Row(userId=1.0, movieId=1339.0, rating=3.5, timestamp=1260759125.0),
 Row(userId=1.0, movieId=1343.0, rating=2.0, timestamp=1260759131.0)]

In [4]:
import pyspark

from pyspark.ml.tuning import *
from pyspark.sql.types import *

In [5]:
from pyspark.ml.recommendation import ALS
als = ALS() \
    .setUserCol("userId") \
    .setRatingCol("rating") \
    .setItemCol("movieId") \

alsModel = als.fit(df)

In [6]:
alsModel.save("./outputs/model")

In [7]:
!ls outputs/model

itemFactors  metadata  userFactors


In [14]:
from pyspark.ml.recommendation import ALSModel
newModel = ALSModel.load("./outputs/model")

In [20]:
score = newModel.transform(df)

In [21]:
score.take(10)

[Row(userId=1.0, movieId=31.0, rating=2.5, timestamp=1260759144.0, prediction=2.422898530960083),
 Row(userId=1.0, movieId=1029.0, rating=3.0, timestamp=1260759179.0, prediction=2.928335189819336),
 Row(userId=1.0, movieId=1061.0, rating=3.0, timestamp=1260759182.0, prediction=2.8850228786468506),
 Row(userId=1.0, movieId=1129.0, rating=2.0, timestamp=1260759185.0, prediction=2.0208959579467773),
 Row(userId=1.0, movieId=1172.0, rating=4.0, timestamp=1260759205.0, prediction=3.5426249504089355),
 Row(userId=1.0, movieId=1263.0, rating=2.0, timestamp=1260759151.0, prediction=2.0792593955993652),
 Row(userId=1.0, movieId=1287.0, rating=2.0, timestamp=1260759187.0, prediction=2.267232656478882),
 Row(userId=1.0, movieId=1293.0, rating=2.0, timestamp=1260759148.0, prediction=2.3691368103027344),
 Row(userId=1.0, movieId=1339.0, rating=3.5, timestamp=1260759125.0, prediction=3.2346277236938477),
 Row(userId=1.0, movieId=1343.0, rating=2.0, timestamp=1260759131.0, prediction=2.27142477035522

In [6]:
userRecs = alsModel.recommendForAllUsers(1000)

In [28]:
!ls ./outputs/

model  model.pkl  userrecs.parquet


In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [39]:
reloadUserRecs = spark.read.parquet("./outputs/userrecs.parquet")

In [44]:
reloadUserRecs.toPandas().loc[reloadUserRecs.toPandas()['userId']==55]

Unnamed: 0,userId,recommendations
0,55,"[(2202, 5.086619853973389), (8874, 4.972634315..."


In [55]:
cSchema = StructType([StructField("userId", IntegerType()),
                      StructField("itemID", IntegerType()),
                      StructField("rating", IntegerType()),
                      StructField("notTime", IntegerType())])

ratings = spark.createDataFrame([
    (0, 1, 4, 4),
    (0, 3, 1, 1),
    (0, 4, 5, 5),
    (0, 5, 3, 3),
    (0, 7, 3, 3),
    (0, 9, 3, 3),
    (0, 10, 3, 3),
    (1, 1, 4, 4),
    (1, 2, 5, 5),
    (1, 3, 1, 1),
    (1, 6, 4, 4),
    (1, 7, 5, 5),
    (1, 8, 1, 1),
    (1, 10, 3, 3),
    (2, 1, 4, 4),
    (2, 2, 1, 1),
    (2, 3, 1, 1),
    (2, 4, 5, 5),
    (2, 5, 3, 3),
    (2, 6, 4, 4),
    (2, 8, 1, 1),
    (2, 9, 5, 5),
    (2, 10, 3, 3),
    (3, 2, 5, 5),
    (3, 3, 1, 1),
    (3, 4, 5, 5),
    (3, 5, 3, 3),
    (3, 6, 4, 4),
    (3, 7, 5, 5),
    (3, 8, 1, 1),
    (3, 9, 5, 5),
    (3, 10, 3, 3)], cSchema)


In [56]:
input_df = ratings.select("userId")
input_df.toPandas().head(5)

Unnamed: 0,userId
0,0
1,0
2,0
3,0
4,0


In [57]:
input_df.join(reloadUserRecs, "userId").toPandas()

Unnamed: 0,userId,recommendations
0,1,"[(4483, 4.079410552978516), (4679, 3.974742412..."
1,1,"[(4483, 4.079410552978516), (4679, 3.974742412..."
2,1,"[(4483, 4.079410552978516), (4679, 3.974742412..."
3,1,"[(4483, 4.079410552978516), (4679, 3.974742412..."
4,1,"[(4483, 4.079410552978516), (4679, 3.974742412..."
5,1,"[(4483, 4.079410552978516), (4679, 3.974742412..."
6,1,"[(4483, 4.079410552978516), (4679, 3.974742412..."
7,2,"[(222, 4.926865577697754), (1224, 4.9186658859..."
8,2,"[(222, 4.926865577697754), (1224, 4.9186658859..."
9,2,"[(222, 4.926865577697754), (1224, 4.9186658859..."


In [20]:
store_name = "teamtaostorage "
key = "kXp+RFtHAdR4NO53TtyyOcDXvCziwEWT+dYEpvBKIH6k1hQ9+2u4FBMhC/oK/msY/oCBvj+Gr5/PQynyX4rzFQ=="
container = "recommendationhackathon"


spark._jsc.hadoopConfiguration().set('fs.azure.account.key.' + store_name + '.blob.core.windows.net',key)

In [21]:
wasb = "wasb://" + container + "@" + store_name + ".blob.core.windows.net/test.parquet"
userRecs.write.parquet(wasb)

Py4JJavaError: An error occurred while calling o281.parquet.
: org.apache.hadoop.fs.azure.AzureException: org.apache.hadoop.fs.azure.AzureException: Unable to access container recommendationhackathon in account teamtaostorage%20.blob.core.windows.net using anonymous credentials, and no credentials found for them  in the configuration.
	at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:938)
	at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.initialize(AzureNativeFileSystemStore.java:438)
	at org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1048)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
	at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:407)
	at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:474)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:509)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.fs.azure.AzureException: Unable to access container recommendationhackathon in account teamtaostorage%20.blob.core.windows.net using anonymous credentials, and no credentials found for them  in the configuration.
	at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.connectUsingAnonymousCredentials(AzureNativeFileSystemStore.java:735)
	at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:933)
	... 37 more
Caused by: com.microsoft.azure.storage.StorageException: The server encountered an unknown failure: 
	at com.microsoft.azure.storage.StorageException.translateException(StorageException.java:178)
	at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:214)
	at com.microsoft.azure.storage.blob.CloudBlobContainer.exists(CloudBlobContainer.java:749)
	at com.microsoft.azure.storage.blob.CloudBlobContainer.exists(CloudBlobContainer.java:736)
	at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobContainerWrapperImpl.exists(StorageInterfaceImpl.java:213)
	at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.connectUsingAnonymousCredentials(AzureNativeFileSystemStore.java:729)
	... 38 more
Caused by: java.net.UnknownHostException: teamtaostorage%20.blob.core.windows.net
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:589)
	at java.net.Socket.connect(Socket.java:538)
	at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
	at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
	at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
	at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
	at sun.net.www.http.HttpClient.New(HttpClient.java:339)
	at sun.net.www.http.HttpClient.New(HttpClient.java:357)
	at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1202)
	at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1138)
	at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1032)
	at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:966)
	at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1546)
	at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1474)
	at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
	at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:124)
	... 42 more


In [25]:
from azureml.api.schema.dataTypes import DataTypes
from azureml.api.schema.sampleDefinition import SampleDefinition
from azureml.api.realtime.services import generate_schema
import pandas

df = pandas.DataFrame(data=[[3.0]],
                      columns=['userId'])

# Turn on data collection debug mode to view output in stdout
os.environ["AML_MODEL_DC_DEBUG"] = 'true'

# Test the output of the functions

input1 = pandas.DataFrame([[3.0]])

inputs = {"input_df": SampleDefinition(DataTypes.PANDAS, df)}


In [36]:
input1.iloc[0][0]

3.0

In [14]:
userRecs.select("userId", "recommendations.movieId").take(10)

[Row(userId=31, movieId=[1225, 912, 923, 58559, 50, 6350, 5952, 7153, 7361, 1682]),
 Row(userId=65, movieId=[912, 1358, 858, 2064, 2863, 1299, 246, 2971, 1292, 1230]),
 Row(userId=53, movieId=[2433, 1220, 1242, 2202, 1059, 1203, 102753, 71180, 7941, 48682]),
 Row(userId=34, movieId=[923, 3949, 912, 1217, 1225, 3811, 678, 2064, 1208, 1287]),
 Row(userId=28, movieId=[1221, 1267, 858, 541, 1366, 2583, 898, 1193, 306, 58]),
 Row(userId=26, movieId=[27831, 265, 65514, 44195, 59784, 6016, 2571, 58559, 8798, 54286]),
 Row(userId=27, movieId=[589, 1225, 1198, 1136, 923, 1291, 1704, 1224, 73, 2858]),
 Row(userId=44, movieId=[1220, 3, 31435, 65037, 8132, 59684, 76173, 62, 2470, 318]),
 Row(userId=12, movieId=[1884, 3879, 3825, 1215, 3865, 1235, 3863, 5679, 3793, 1220]),
 Row(userId=22, movieId=[589, 4993, 37729, 4226, 3081, 1193, 1884, 32, 1235, 2202])]

In [69]:

# Write configuration
writeConfig = {
"Endpoint" : "https://dcibrecommendationhack.documents.azure.com:443/",
"Masterkey" : "oX6tWPep8FCah8RM258s7cC3x9Kl8tWdbDxmNknXCP34ShW1Ag1ladvb5QWuBmMxuRISBO2HfrRFv3QeJYCSYg==",
"Database" : "recommendation_engine",
"Collection" : "user_recommendations",
"Upsert" : "true"
}
userRecs.select("userId", "recommendations.movieId").write.format("com.microsoft.azure.cosmosdb.spark").options(**writeConfig).save()

Py4JJavaError: An error occurred while calling o444.save.
: java.lang.UnsupportedOperationException: Writing in a non-empty collection.
	at com.microsoft.azure.cosmosdb.spark.DefaultSource.createRelation(DefaultSource.scala:79)
	at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)


In [16]:
MASTER_KEY = 'oX6tWPep8FCah8RM258s7cC3x9Kl8tWdbDxmNknXCP34ShW1Ag1ladvb5QWuBmMxuRISBO2HfrRFv3QeJYCSYg=='
HOST = 'https://dcibrecommendationhack.documents.azure.com:443/'

In [47]:
import pydocumentdb.documents as documents
import pydocumentdb.document_client as document_client
import pydocumentdb.errors as errors
import datetime

client = document_client.DocumentClient(HOST, {'masterKey': MASTER_KEY} )

In [66]:
DATABASE_ID = "recommendation_engine"
COLLECTION_ID = "user_recommendations"
database_link = 'dbs/' + DATABASE_ID
collection_link = database_link + '/colls/' + COLLECTION_ID


def ReadDocument(client, doc_id):
    print('\n1.2 Reading Document by Id\n')

    # Note that Reads require a partition key to be spcified. This can be skipped if your collection is not
    # partitioned i.e. does not have a partition key definition during creation.
    
    doc_link = collection_link + '/docs/' + doc_id
    response = client.ReadDocument(doc_link)

    print('Document read by Id {0}'.format(doc_id))
    print('Account Number: {0}'.format(response.get('account_number')))

def ReadDocuments(client):
    print('\n1.3 - Reading all documents in a collection\n')

    # NOTE: Use MaxItemCount on Options to control how many documents come back per trip to the server
    #       Important to handle throttles whenever you are doing operations such as this that might
    #       result in a 429 (throttled request)
    documentlist = list(client.ReadDocuments(collection_link), {'maxItemCount':10})

    print('Found {0} documents'.format(documentlist.__len__()))

    for doc in documentlist:
        print('Document Id: {0}'.format(doc.get('id')))


In [61]:
db = client.ReadDatabase(database_link)

In [68]:
collection = client.ReadCollection(collection_link=collection_link)

In [84]:
# Query them in SQL
query = { 'query': 'SELECT * FROM server s WHERE s.userId = 37' }    

options = {} 

result_iterable = client.QueryDocuments(collection['_self'], query, options)
results = list(result_iterable);

print(results)

[{'id': 'e4924b89-2db0-4dfa-a182-2c4a6b68d6f3', '_ts': 1518453027, '_attachments': 'attachments/', 'movieId': [1204, 497, 1207, 1784, 2997, 923, 1186, 3298, 3006, 3996], '_rid': 'iqYGAJ0pMAABAAAAAAAAAA==', '_etag': '"00005401-0000-0000-0000-5a81c1230000"', 'userId': 37, '_self': 'dbs/iqYGAA==/colls/iqYGAJ0pMAA=/docs/iqYGAJ0pMAABAAAAAAAAAA==/'}]
