Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #LR-676 feat: Ownership Transfer Report #102

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
fd633ae
Issue #LR-676 feat: User Delete reports
BharathwajShankar Feb 16, 2024
8efa5fe
Issue #LR-676 feat: User Delete reports
BharathwajShankar Feb 19, 2024
56dd7ea
Issue #LR-676 feat: User Delete reports
BharathwajShankar Feb 27, 2024
7839e6c
Issue #LR-676 feat: User Delete reports
BharathwajShankar Feb 28, 2024
44261c4
Issue #LR-676 feat: User Delete reports bug fix
BharathwajShankar Feb 28, 2024
f2605f3
Issue #LR-676 feat: User Delete reports bug fix
BharathwajShankar Feb 28, 2024
5ef124a
Issue #LR-676 feat: User Delete reports bug fix
BharathwajShankar Feb 28, 2024
aaa7ed6
Merge branch 'release-7.0.0' of https://github.com/Sunbird-Lern/data-…
BharathwajShankar Mar 21, 2024
ea47832
Issue #LR-676 feat:user delete reports lern model fix
BharathwajShankar Mar 21, 2024
c883bf1
Issue #LR-676 feat:fix related to cloud storage endpoint
BharathwajShankar Mar 21, 2024
d37a5ab
Issue #LR-676 feat:fix related to cloud storage endpoint removed new …
BharathwajShankar Mar 21, 2024
d7d2a00
Issue #LR-676 feat: delete user report spark fix
BharathwajShankar Mar 21, 2024
47dcfa2
Issue #LR-676 feat: delete user report keyspace fix
BharathwajShankar Mar 21, 2024
b829986
Issue #LR-676 feat:added custom cassandra host
BharathwajShankar Mar 22, 2024
44e56c8
Issue #LR-676 Adding ingressip to delete user assets report
BharathwajShankar Mar 25, 2024
b8fe30f
Merge branch 'release-7.0.0' of https://github.com/Sunbird-Lern/data-…
BharathwajShankar Mar 26, 2024
9eb2742
Issue #LR-676 feat:adding private ingress ip for checking
BharathwajShankar Mar 26, 2024
f0267eb
Issue #LR-676 Adding logs to debug url issue
BharathwajShankar Mar 26, 2024
42cb317
Issue #LR-676 Adding logs to debug url issue
BharathwajShankar Mar 26, 2024
6391611
Issue #LR-676 Adding logs to debug url issue
BharathwajShankar Mar 26, 2024
bef97b9
Issue #LR-676 Adding logs to debug url issue
BharathwajShankar Mar 26, 2024
7db27a5
Issue #LR-676 Adding logs to debug url issue
BharathwajShankar Mar 26, 2024
5c95268
Issue #LR-676 Adding direct url
BharathwajShankar Mar 26, 2024
386ed26
Issue #LR-676 feat:adding savetoblob logic
BharathwajShankar Mar 27, 2024
d68d864
Issue #LR-676 feat:adding savetoblob logic
BharathwajShankar Mar 27, 2024
4c0bab8
Issue #LR-676 feat:adding savetoblob logic with root org id
BharathwajShankar Mar 27, 2024
374fc0a
Issue #LR-676 feat:adding savetoblob logic with root org id
BharathwajShankar Mar 27, 2024
15139a4
Issue #LR-676 feat:adding savetoblob logic with root org id
BharathwajShankar Mar 27, 2024
c44184b
Issue #LR-676 decrypting username
BharathwajShankar Apr 2, 2024
b663d16
Issue #LR-676 decrypting username and storing back to username itself
BharathwajShankar Apr 2, 2024
038c6fe
Issue #LR-676 reading url from conf
BharathwajShankar Apr 2, 2024
54929cc
Issue #LR-676 fixing url
BharathwajShankar Apr 2, 2024
b0bf09c
Issue #LR-676 fixing url
BharathwajShankar May 2, 2024
13a07c6
Merge branch 'release-8.0.0' of https://github.com/Sunbird-Lern/data-…
BharathwajShankar May 2, 2024
de0768b
Issue #LR-676 feat:hardcoded cloud store version
BharathwajShankar May 2, 2024
3c9c275
Issue #LR-676 feat:hardcoded cloud store version
BharathwajShankar May 2, 2024
a70538c
Issue #LR-676 feat:hardcoded cloud store version
BharathwajShankar May 2, 2024
c1d2b59
Issue #LR-676 feat:hardcoded cloud store version
BharathwajShankar May 2, 2024
1234eea
Issue #LR-676 feat:hardcoded cloud store version
BharathwajShankar May 2, 2024
201f735
Merge branch 'release-8.0.0' of https://github.com/Sunbird-Lern/data-…
BharathwajShankar May 3, 2024
2e3fd80
Issue #LR-676 fixing testcase for data products
BharathwajShankar May 7, 2024
cfccb92
Issue #LR-676 feat:hardcoded cloud store version
BharathwajShankar May 7, 2024
e9bc85b
Issue #LR-676 feat:hardcoded cloud store version
BharathwajShankar May 7, 2024
ad62541
Issue #LR-676 feat:conf changes
BharathwajShankar May 7, 2024
27ee3e5
Issue #LR-676 fixing testcase for data products
BharathwajShankar May 7, 2024
9919b5c
Issue #LR-676 fixing testcase for data products
BharathwajShankar May 7, 2024
81704f2
Issue #LR-676 fixing testcase for data products
BharathwajShankar May 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 0 additions & 4 deletions ansible/inventory/env/group_vars/all.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ postgres_address_space: 0.0.0.0/0 # Postgres trust address space
lp_composite_search: "http://{{ groups['composite-search-cluster'][0] }}:9200" # Composite Cluster ip of LP
lp_composite_search_host: "{{ groups['composite-search-cluster'][0] }}"
lp_search: "http://{{private_ingressgateway_ip}}/search"
service:
search:
url: http://{{private_ingressgateway_ip}}/search
path: /v3/search

default_channel: "{{default_org_hash_id}}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ application.env="{{ env }}"
telemetry.version="2.1"
default.parallelization="10"
spark_output_temp_dir="/mount/data/analytics/tmp/"
service.search.url="{{ service.search.url }}"
service.search.path="{{ service.search.path }}"
service.search.url="http://{{private_ingressgateway_ip}}/search"
service.search.path="/v3/search"
lms.service.url="http://{{private_ingressgateway_ip}}/lms"
lms.batch.search.path="/v1/course/batch/search"
spark.cassandra.connection.host="{{groups['dp-cassandra'][0]}}"
cassandra.keyspace_prefix="{{ cassandra_keyspace_prefix }}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ config() {


if [ -z "$2" ]; then endDate=$(date --date yesterday "+%Y-%m-%d"); else endDate=$2; fi
if [ ! -z "$3" ]; then specificUserId=$3; fi #used as specfic user id in usercacheindxerjob
if [ ! -z "$3" ]; then specificUserId=$3; configuredUserId=$3; fi #used as specfic user id in usercacheindxerjob,used as configured user id in deleteusersassetsreportjob
if [ ! -z "$4" ]; then populateAnonymousData=$4; fi #used as populateAnonymousData flag in usercacheindxerjob
if [ ! -z "$5" ]; then refreshUserData=$5; fi #used as refreshUserData flag in usercacheindxerjob
if [ ! -z "$5" ]; then refreshUserData=$5; configuredChannel=$5;fi #used as refreshUserData flag in usercacheindxerjob,used as configured channel flag in deleteusersassetsreportjob
case "$1" in
"assessment-correction")
echo '{"search":{"type":"{{dp_object_store_type}}","queries":[{"bucket":"'$bucket'","prefix":"unique/raw/","endDate":"'$endDate'","delta":0}]},"model":"org.sunbird.analytics.model.report.AssessmentCorrectionModel","modelParams":{"parallelization":200,"druidConfig":{"queryType":"groupBy","dataSource":"content-model-snapshot","intervals":"1901-01-01T00:00:00+00:00/2101-01-01T00:00:00+00:00","granularity":"all","aggregations":[{"name":"count","type":"count","fieldName":"count"}],"dimensions":[{"fieldName":"identifier","aliasName":"identifier"}],"filters":[{"type":"equals","dimension":"contentType","value":"SelfAssess"}],"descending":"false"},"fileOutputConfig":{"to":"file","params":{"file":"{{ analytics.home }}/assessment-correction/skippedEvents"}},"sparkCassandraConnectionHost":"'$sunbirdPlatformCassandraHost'"},"output":[{"to":"kafka","params":{"brokerList":"'$brokerIngestionList'","topic":"'$assessTopic'"}}],"parallelization":200,"appName":"Assessment Correction Model"}'
Expand Down Expand Up @@ -94,6 +94,9 @@ config() {
"program-user-exhaust")
echo '{"search":{"type":"none"},"model":"org.sunbird.ml.exhaust.ProgramUserInfoExhaustJob","modelParams":{"store":"azure","mode":"OnDemand","authorizedRoles":["PROGRAM_MANAGER"],"id":"ml-program-user-exhaust","keyspace_name":"sunbird_programs","table":[{"columns":["user_id","program_name","program_externalId","user_locations","user_type","user_sub_type","organisation_name","pii_consent_required"],"name":"program_enrollment","user_locations_columns":["state_name","district_name","block_name","cluster_name","school_code","school_name"]},{"name":"user","columns":["userid","firstname","lastname","email","phone","username"],"encrypted_columns":["email","phone"],"final_columns":["email","phone","username"]}],"label_mapping":{"user_id":"User UUID","username":"User Name(On user consent)","phone":"Mobile number(On user consent)","email":"Email ID(On user consent)","consentflag":"Consent Provided","consentprovideddate":"Consent Provided Date","program_name":"Program Name","program_externalId":"Program ID","state_name":"State","district_name":"District","block_name":"Block","cluster_name":"Cluster","school_code":"School Id","school_name":"School Name","user_type":"Usertype","user_sub_type":"Usersubtype","organisation_name":"Org Name"},"order_of_csv_column":["User UUID","User Name(On user consent)","Mobile number(On user consent)","Email ID(On user consent)","Consent Provided","Consent Provided Date","Program Name","Program ID","State","District","Block","Cluster","School Id","School Name","Usertype","Usersubtype","Org Name"],"sort":["District","Block","Cluster","School Id","User UUID"],"quote_column":["User Name(On user consent)","Program Name"],"sparkElasticsearchConnectionHost":"{{ sunbird_es_host }}","sparkRedisConnectionHost":"{{ metadata2_redis_host }}","sparkUserDbRedisIndex":"12","sparkCassandraConnectionHost":"{{ core_cassandra_host }}","sparkUserDbRedisPort":"{{ user_port }}","fromDate":"$(date --date yesterday '+%Y-%m-%d')","toDate":"$(date --date yesterday '+%Y-%m-%d')","key":"ml_reports/","format":"csv"},"output":[{"params":{"file":"ml_reports/"},"to":"file"}],"parallelization":8,"appName":"Program UserInfo Exhaust"}'
;;
"delete-users-assets-report-job")
echo '{"search":{"type":"none"},"model":"org.sunbird.userorg.job.report.DeletedUsersAssetsReportJob","modelParams":{"store":"{{dp_object_store_type}}","storageKeyConfig":"storage.key.config","storageSecretConfig":"storage.secret.config","storageContainer":"{{reports_container}}","storageEndpoint":"{{dp_storage_endpoint_config}}","configuredUserId":'$configuredUserId',"configuredOrganisationId":[],"configuredChannel":'$configuredChannel', "sparkCassandraConnectionHost":"{{ core_cassandra_host }}"}}'
;;
"*")
echo "Unknown model code"
exit 1 # Command to come out of the program with status 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ get_report_job_model_name(){
;;
"program-user-exhaust") echo 'org.sunbird.ml.exhaust.ProgramUserInfoExhaustJob'
;;
"delete-users-assets-report-job") echo 'org.sunbird.userorg.job.report.DeletedUsersAssetsReportJob'
;;
*) echo $1
;;
esac
Expand Down
9 changes: 6 additions & 3 deletions lern-data-products/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ data_exhaust {

cloud_storage_type="local"

service.search.url="https://dev.open-sunbird.org/api/composite"
service.search.path="/v1/search"
service.search.url="https://dev.sunbirded.org/action/composite"
service.search.path="/v3/search"
lms.service.url="https://dev.sunbirded.org/api"
lms.batch.search.path="/course/v1/batch/list"

## Reports - Global config
cloud.container.reports="reports"
Expand Down Expand Up @@ -155,4 +157,5 @@ cassandra.input.consistency.level="LOCAL_QUORUM"
org.search.private.api.url="{{sunbird_learner_service_url}}/private/v2/org/search"
tenant.pref.read.private.api.url="{{sunbird_learner_service_url}}/private/v2/org/preferences/read"

sunbird_instance_name="Sunbird"
sunbird_instance_name="Sunbird"
delete.user.cloud.objectKey="reports/"
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ object Constants {
val EXPERIMENT_DEFINITION_TABLE = "experiment_definition";

val SEARCH_SERVICE_URL = AppConf.getConfig("service.search.url")
var COURSE_BATCH_URL: String = AppConf.getConfig("lms.service.url")
val COMPOSITE_SEARCH_URL = s"$SEARCH_SERVICE_URL" + AppConf.getConfig("service.search.path")
val COURSE_BATCH_SEARCH_URL=s"$COURSE_BATCH_URL" + AppConf.getConfig("lms.batch.search.path")
val TENANT_PREFERENCE_PRIVATE_READ_URL = AppConf.getConfig("tenant.pref.read.private.api.url")
val ORG_PRIVATE_SEARCH_URL: String = AppConf.getConfig("org.search.private.api.url")
val TEMP_DIR = AppConf.getConfig("spark_output_temp_dir")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ case class CollectionDetails(result: Map[String, AnyRef])
case class CollectionInfo(channel: String, identifier: String, name: String, userConsent: Option[String], status: String)
case class Metrics(totalRequests: Option[Int], failedRequests: Option[Int], successRequests: Option[Int], duplicateRequests: Option[Int])
case class ProcessedRequest(channel: String, batchId: String, filePath: String, fileSize: Long)
case class DeleteCollectionInfo(identifier: String, userId: String, name: String, objectType: String, status: String)
case class CourseBatch(identifier: String, userId: String, name: String, status: String)

trait BaseCollectionExhaustJob extends BaseReportsJob with IJob with OnDemandExhaustJob with Serializable {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package org.sunbird.userorg.job.report

import org.apache.spark.SparkContext
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.functions.{col, lit, udf, when}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql._
import org.ekstep.analytics.framework.JobDriver.className
import org.ekstep.analytics.framework.conf.AppConf
import org.ekstep.analytics.framework.util.DatasetUtil.extensions
import org.ekstep.analytics.framework.util.{JSONUtils, JobLogger, RestUtil}
import org.ekstep.analytics.framework.{FrameworkContext, IJob, JobConfig}
import org.sunbird.core.util.{Constants, DecryptUtil}
import org.sunbird.lms.exhaust.collection.{CollectionDetails, CourseBatch, DeleteCollectionInfo}
import org.sunbird.lms.job.report.BaseReportsJob

import java.text.SimpleDateFormat
import java.util.Date

object DeletedUsersAssetsReportJob extends IJob with BaseReportsJob with Serializable {

override def main(config: String)(implicit sc: Option[SparkContext], fc: Option[FrameworkContext]): Unit = {
val jobConfig = JSONUtils.deserialize[JobConfig](config)
val configuredUserId: List[String] = jobConfig.modelParams.get("configuredUserId").asInstanceOf[List[String]]
val configuredChannel: List[String] = jobConfig.modelParams.get("configuredChannel").asInstanceOf[List[String]]
JobLogger.init(name())
JobLogger.start("Started executing", Option(Map("config" -> config, "model" -> name)))
val spark = openSparkSession(jobConfig)
implicit val stringEncoder: Encoder[String] = ExpressionEncoder[String]
val userIds: List[String] = if (configuredUserId.nonEmpty) configuredUserId else getUserIdsFromDeletedUsers(fetchDeletedUsers(spark))
val channels: List[String] = if (configuredChannel.nonEmpty) configuredChannel else List.empty[String]
val deletedUsersDF = fetchDeletedUsers(spark)
System.out.println(deletedUsersDF.count())
deletedUsersDF.show()
val contentAssetsDF = fetchContentAssets(userIds, channels)(spark)
val courseAssetsDF = fetchCourseAssets(userIds, channels)(spark)
val renamedDeletedUsersDF = deletedUsersDF
.withColumnRenamed("id", "userIdAlias")
.withColumnRenamed("username", "usernameAlias")
.withColumnRenamed("rootorgid", "organisationIdAlias")
// Join deleted users with content assets
val joinedContentDF = renamedDeletedUsersDF.join(contentAssetsDF, renamedDeletedUsersDF("userIdAlias") === contentAssetsDF("userId"), "inner")
// Join deleted users with course batch assets
val joinedCourseDF = renamedDeletedUsersDF.join(courseAssetsDF, renamedDeletedUsersDF("userIdAlias") === courseAssetsDF("userId"), "inner")
// Modify the concatRoles UDF to handle arrays
val concatRoles = udf((roles: Any) => {
roles match {
case s: Seq[String] => s.mkString(", ")
case _ => roles.toString
}
})
// Select columns for the final output without using collect_list in UDF
val userCols = Seq(
renamedDeletedUsersDF("userIdAlias").alias("userId"),
renamedDeletedUsersDF("usernameAlias").alias("username"),
renamedDeletedUsersDF("organisationIdAlias").alias("organisationId"),
concatRoles(renamedDeletedUsersDF("roles")).alias("roles")
)
// Select columns for content assets
val contentCols = Seq(
contentAssetsDF("identifier").alias("assetIdentifier"),
contentAssetsDF("name").alias("assetName"),
contentAssetsDF("status").alias("assetStatus"),
contentAssetsDF("objectType")
)
// Select columns for course batch assets
val courseCols = Seq(
courseAssetsDF("identifier").alias("assetIdentifier"),
courseAssetsDF("name").alias("assetName"),
when(courseAssetsDF("status") === "0", "Upcoming Batch")
.when(courseAssetsDF("status") === "1", "Ongoing Batch")
.when(courseAssetsDF("status") === "2", "Batch ended").alias("assetStatus"),
courseAssetsDF("objectType")
)
// Combine DataFrames for content and course batch using unionAll
val combinedDF = joinedContentDF.select(userCols ++ contentCols: _*).unionAll(
joinedCourseDF.select(userCols ++ courseCols: _*)
)
// Deduplicate the combined DataFrame based on user ID
val finalDF = combinedDF.distinct()
val decryptUsernameUDF = udf((encryptedUsername: String) => {
DecryptUtil.decryptData(encryptedUsername)
})
val decryptedFinalDF = finalDF.withColumn("username", decryptUsernameUDF(finalDF("username")))
decryptedFinalDF.show()
val container = AppConf.getConfig("cloud.container.reports")
val objectKey = AppConf.getConfig("delete.user.cloud.objectKey")
val storageConfig = getStorageConfig(container, objectKey, jobConfig)
val formattedDate: String = {
new SimpleDateFormat("yyyyMMdd").format(new Date())
}
finalDF.saveToBlobStore(storageConfig,"csv",s"delete_user_$formattedDate", Option(Map("header" -> "true")), Option(Seq("organisationId")))
}
def name(): String = "DeletedUsersAssetsReportJob"
def fetchContentAssets(userIds: List[String], channels: List[String])(implicit spark: SparkSession): DataFrame = {
System.out.println("inside content assets")
val apiURL = Constants.COMPOSITE_SEARCH_URL
val limit = 10000 // Set the desired limit for each request
var offset = 0
var totalRecords = 0

var contentDf: DataFrame = spark.createDataFrame(spark.sparkContext.emptyRDD[Row],
StructType(Seq(
StructField("identifier", StringType),
StructField("userId", StringType),
StructField("name", StringType),
StructField("objectType", StringType),
StructField("status", StringType)
))
)

do {
val requestMap = Map(
"request" -> Map(
"filters" -> Map(
"createdBy" -> userIds,
"channel" -> channels,
"status" -> Array("Live", "Draft", "Review", "Unlisted")
),
"fields" -> Array("identifier", "createdBy", "name", "objectType", "status", "lastUpdatedOn"),
"sortBy" -> Map("createdOn" -> "Desc"),
"offset" -> offset,
"limit" -> limit
)
)

val request = JSONUtils.serialize(requestMap)
val response = RestUtil.post[CollectionDetails](apiURL, request).result
val count = response.getOrElse("count", 0).asInstanceOf[Int]

// Process each key in the result map
response.asInstanceOf[Map[String, Any]].foreach {
case (key, value) =>
value match {
case list: List[Map[String, Any]] =>
// Process each entry in the list
val entries = list.map(entry =>
DeleteCollectionInfo(
entry.getOrElse("identifier", "").toString,
entry.getOrElse("createdBy", "").toString,
entry.getOrElse("name", "").toString,
entry.getOrElse("objectType", "").toString,
entry.getOrElse("status", "").toString
)
)
// Create a DataFrame from the entries
val entryDF = spark.createDataFrame(entries)
.withColumnRenamed("createdBy", "userId")
.select("identifier", "userId", "name", "objectType", "status")
// Union with the existing contentDf
contentDf = contentDf.union(entryDF)

case _ => // Ignore other types
}
}

totalRecords = count
offset += limit
} while (offset < totalRecords)

contentDf.show()
System.out.println(contentDf.count())
contentDf
}

def getUserIdsFromDeletedUsers(df: DataFrame)(implicit enc: Encoder[String]): List[String] = {
val userIds: List[String] = df.select("id").as[String](enc).collect().toList
userIds
}

def fetchDeletedUsers(implicit spark: SparkSession): DataFrame = {
val sunbirdKeyspace = AppConf.getConfig("sunbird.user.keyspace")
val userDf = loadData(spark, Map("table" -> "user", "keyspace" -> sunbirdKeyspace), None).select(
col("id"),
col("username"),
col("rootorgid"),
col("roles"),
col("status")).filter("status = 2").persist()
userDf
}

def fetchCourseAssets(userIds: List[String], channels: List[String])(implicit spark: SparkSession): DataFrame = {
System.out.println("inside course assets")
val apiUrl = Constants.COURSE_BATCH_SEARCH_URL
val limit = 10000 // Set the desired limit for each request
var offset = 0
var totalRecords = 0

var courseDataDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row],
StructType(Seq(StructField("identifier", StringType), StructField("userId", StringType),
StructField("name", StringType), StructField("status", StringType), StructField("objectType", StringType)))
)

do {
val requestMap = Map(
"request" -> Map(
"filters" -> Map(
"createdBy" -> userIds,
"createdFor" -> channels,
"status" -> 1),
"fields" -> Array("identifier", "name", "createdBy", "status"),
"sortBy" -> Map("createdOn" -> "Desc"),
"offset" -> offset,
"limit" -> limit
)
)

val request = JSONUtils.serialize(requestMap)
val response = RestUtil.post[CollectionDetails](apiUrl, request).result
val responseMap = response.getOrElse("response", Map.empty).asInstanceOf[Map[String, Any]]
val count = responseMap.getOrElse("count", 0).asInstanceOf[Int]
val content = responseMap.getOrElse("content", List.empty).asInstanceOf[List[Map[String, Any]]]

if (content.nonEmpty) {
val courses = content.map(entry => CourseBatch(
entry("identifier").toString,
entry("createdBy").toString,
entry("name").toString,
entry("status").toString
))
val coursesDF = spark.createDataFrame(courses)
.withColumnRenamed("createdBy", "userId")
.select("identifier", "userId", "name", "status")
.withColumn("objectType", lit("Course Batch"))
courseDataDF = courseDataDF.union(coursesDF)
}

totalRecords = count
offset += limit
} while (offset < totalRecords)

courseDataDF.show()
System.out.println(courseDataDF.count())
courseDataDF
}
}
3 changes: 3 additions & 0 deletions lern-data-products/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,6 @@ framework_read_api = "/api/framework/v1/read/"
taxonomy.basePath = "http://localhost:9100"
redis.user.database.index=0
sunbird_instance_name="Sunbird"

lms.service.url="http://localhost:9080/lms"
lms.batch.search.path="/v1/course/batch/search"